Module vipy.util

Expand source code Browse git
import urllib.request
import urllib.parse
import urllib.error
from urllib.parse import urlparse
from os import chmod
import os.path
import numpy as np
import tempfile
import time
from time import gmtime, strftime, localtime
from datetime import datetime
import sys
import csv
import hashlib
import shutil
import re
import uuid
import dill  
import builtins
import pickle as cPickle
import PIL
import matplotlib.pyplot as plt
from itertools import groupby as itertools_groupby
from itertools import tee, chain
import importlib
import pathlib
import socket
import warnings
import copy
import bz2


try:
    import ujson as json  # faster
except ImportError:
    import json
    
def class_registry():
    """Return a dictionary mapping str(type(obj)) to a JSON loader for all vipy objects.

    This function is useful for JSON loading of vipy objects to map to the correct deserialization method.
    """

    import vipy.video
    import vipy.image
    import vipy.dataset

    registry = {"<class 'vipy.video.Scene'>":vipy.video.Scene.from_json,
                "<class 'vipy.video.Video'>":vipy.video.Video.from_json,
                "<class 'vipy.video.VideoCategory'>":vipy.video.VideoCategory.from_json,
                "<class 'vipy.image.Image'>":vipy.image.Image.from_json,
                "<class 'vipy.image.ImageCategory'>":vipy.image.ImageCategory.from_json,
                "<class 'vipy.image.ImageDetection'>":vipy.image.ImageDetection.from_json,            
                "<class 'vipy.image.Scene'>":vipy.image.Scene.from_json,
                "<class 'vipy.geometry.BoundingBox'>":vipy.geometry.BoundingBox.from_json,
                "<class 'vipy.object.Track'>":vipy.object.Track.from_json,
                "<class 'vipy.object.Detection'>":vipy.object.Detection.from_json,
                "<class 'vipy.activity.Activity'>":vipy.activity.Activity.from_json,
                "<class 'vipy.dataset.Dataset'>":vipy.dataset.Dataset.from_json}
    try:
        import pycollector.video
        registry.update( {"<class 'pycollector.video.Video'>":pycollector.video.Video.from_json} )
    except:
        registry.update( {"<class 'pycollector.video.Video'>":lambda x: exec("raise ValueError(\"<class 'pycollector.video.Video'> not found - Run 'pip install pycollector' \")")})        
    try:
        import pycollector.admin.video
        registry.update( {"<class 'pycollector.admin.video.Video'>":pycollector.admin.video.Video.from_json} )
    except:
        registry.update( {"<class 'pycollector.admin.video.Video'>":lambda x: exec("raise ValueError(\"<class 'pycollector.admin.video.Video'> not found - This is for admin use only \")")})        

    registry.update( {None: cPickle.loads} )  # fallback on generic pickel dumps
    return registry
            

def save(vars, outfile=None, backup=False):
    """Save variables to an archive file.

    This function allows vipy objects to be serialized to disk for later loading.

    ```python
    im = vipy.image.owl()
    im = vipy.util.load(vipy.util.save(im))   # round trip
    ```

    Args:
        vars: A python object to save.  This can be any serializable python object
        outfile:  An output file to save.  Must have extension [.pkl, .json, .pkl.bz2].  If None, will save to a temporary JSON file.
        backup [bool]:  If true and the outfile already exists, make a copy and save as outfile.bak before overwriting
    Returns
        A path to the saved archive file.  Load using `vipy.util.load`. 

    .. note:: JSON is preferred as an archive format for vipy.  Be sure to install the excellent ultrajson library (pip install ujson) for fast serialization.
    """
    allowable = set(['.pkl', '.json', '.pkl.bz2'])
    outfile = tempjson() if outfile is None else outfile

    if backup and os.path.exists(outfile):
        shutil.copyfile(outfile, outfile+'.bak')
    remkdir(filepath(outfile))
    if ispkl(outfile):
        dill.dump(vars, open(outfile, 'wb'))

    elif isjsonfile(outfile):
        saveobj = vars
        registry = class_registry()
        if isinstance(saveobj, list) and all([str(type(d)) in registry for d in saveobj]):
            j = [{str(type(d)):d.json(encode=False)} for d in saveobj] if isinstance(saveobj, list) else ({str(type(d)):d.json(encode=False)} for d in saveobj)
        elif str(type(saveobj)) in registry:
            j = {str(type(saveobj)):saveobj.json(encode=False)}
        else:
            j = saveobj

        s = json.dumps(j, ensure_ascii=False)  # load to memory (faster than json.dump), will throw exception if it cannot serialize
        with open(outfile, 'w') as f:
            f.write(s)            

    elif ispklbz2(outfile):
        return bz2pkl(outfile, vars)
    else:
        raise ValueError('Unknown file extension for save file "%s" - must be in %s' % (fileext(outfile), str(allowable)))
    
    return os.path.abspath(outfile)


def load(infile, abspath=True, refcycle=True):
    """Load variables from a relocatable archive file format, either dill pickle, JSON format or JSON directory format.
       
       Loading is performed by attemping the following:

       1. If the input file is a directory, return a `vipy.dataset.Dataset` with lazy loading of all pkl or json files recursively discovered in this directory.
       2. If the input file is a pickle or json file, load it
       3. if abspath=true, then convert relative paths to absolute paths for object when loaded
       4. If refcycle=False, then disable the python reference cycle garbage collector for large archive files
    
    ```python
    im = vipy.image.owl()
    f = vipy.util.save(im)
    im = vipy.util.load(im)
    ```

       Args:
           infile: [str] file saved using `vipy.util.save` with extension [.pkl, .json].  This may also be a directory tree containing json or pkl files 
           abspath: [bool] If true, then convert all vipy objects with relative paths to absolute paths. If False, then preserve relative paths and warn user.
           refcycle: [bool] If False, then disable python reference cycle garbage collector.  This is useful for large python objects.
       
       Returns:
           The object in the archive file
    """
    infile = os.path.abspath(os.path.expanduser(infile))

    if ispkl(infile):
        obj = dill.load(open(infile, 'rb'))
    elif isjsonfile(infile):
        with open(infile, 'r') as f:
            loadobj = json.load(f)
        registry = class_registry()
        assert isinstance(loadobj, list) or isinstance(loadobj, dict), "invalid vipy JSON serialization format"
        if isinstance(loadobj, list) and all([isinstance(d, dict) for d in loadobj]) and all([c in registry for d in loadobj for (c,v) in d.items()]):
            obj = [registry[c](v) for d in loadobj for (c,v) in d.items()]
        elif isinstance(loadobj, dict) and all([c in registry for (c,d) in loadobj.items()]):
            obj = [registry[c](v) for (c,v) in loadobj.items()]
            obj = obj[0] if len(obj) == 1 else obj
        else:
            obj = loadobj
    elif isbz2(infile):
        return bz2pkl(infile)
    elif os.path.isdir(infile):        
        import vipy.dataset
        return vipy.dataset.Dataset(infile)
    else:
        raise ValueError('unknown file type')
    
    if len(tolist(obj)) == 0:
        return obj  
    testobj = tolist(obj)[0]  

    # Relocatable object?
    if hasattr(testobj, 'filename') and testobj.filename() is not None:
        if not os.path.isabs(testobj.filename()):
            if not abspath:
                warnings.warn('Loading archive "%s" with relative paths.  Changing directory to "%s".  Disable this warning with vipy.util.load(..., abspath=True).' % (infile, filepath(infile)))
                os.chdir(filepath(infile))
            else:
                # Absolute path?  The loaded archive will no longer be relocatable if you save this to a new archive, and the videos directory cannot be moved
                pwd = os.getcwd()  # save current directory
                os.chdir(filepath(infile))  # change to archive directory
                objout = [o.abspath() if o.filename() is not None else o for o in tolist(obj)]  # set absolute paths relative to archive directory
                obj = objout if isinstance(obj, list) else objout[0]
                os.chdir(pwd)  # restore current directory
        elif not testobj.hasfilename():
            warnings.warn('Loading "%s" that contains path (e.g. "%s") which does not exist' % (infile, testobj.filename()))

    # Large vipy object?  Disable garbage collection.
    #   - Python uses reference counting for the primary garbage collection mechanism, but also uses reference cycle checks to search for dependencies between objects.
    #   - All vipy objects are self contained, and do not have reference cycles.  However, there is no way to mark an individual object which does not participate in reference cycle counting.
    #   - This means that a large number of vipy objects, garbage collection can take minutes searching for cycles which are never there.  To fix this, globally disable the garbage collector.
    #   - Note that refernece counting is still performed, we are just disabling reference *cycle* counting using the generational garbage collector.
    #   - This can be re-enabled at any time by "import gc; gc.enable()"
    #   - If you use %autoreload iPython magic command, note that this will be very slow.  You should set %sutoreload 0
    #   - Alternatively, load as JSON and all attributes will be unpacked on demand and stored in a packed format that is not tracked (e.g. tuple of strings) by the reference cycle counter
    if not refcycle:
        warnings.warn('Disabling python reference cycle garbage collection.  Re-enable at any time using "import gc; gc.enable()"')
        import gc; gc.disable()
    return obj


def dirload(indir):
    """Load a directory by recursively searching for loadable archives and loading them into a flat list"""
    return [x for f in findloadable(indir) for x in load(f)]

def dedupe(inlist, f):
    """Deduplicate the list using the provided lambda function which transforms an element to a dedupe key, such that all elements with the same key are duplicates"""
    assert callable(f)
    assert isinstance(inlist, list)
    return list({f(x):x for x in inlist}.values())


def pklbz2(filename, obj=None):
    """Read/Write a bz2 compressed pickle file"""
    assert filename[-8:] == '.pkl.bz2', "Invalid filename - must be '*.pkl.bz2'"
    if obj is not None:
        f = bz2.BZ2File(filename, 'wb')
        cPickle.dump(obj, f)
        f.close()
        return filename
    else:
        f = bz2.BZ2File(filename, 'rb')
        obj = cPickle.load(f)
        f.close()
        return obj
        

def catcher(f, *args, **kwargs):
    """Call the function f with the provided arguments, and return (True, result) on success and (False, exception) if there is any thrown exception.  Useful for parallel processing"""
    assert callable(f)
    try:
        return (True, f(*args, **kwargs))
    except Exception as e:
        return (False, str(e))

def loudcatcher(f, prepend, *args, **kwargs):
    """Call the function f with the provided arguments, and return (True, result) on success and (False, exception) if there is any thrown exception.  Print the exception immediately.  Useful for parallel processing"""
    assert callable(f)
    try:
        return (True, f(*args, **kwargs))
    except Exception as e:
        print('%s%s' % (prepend, str(e)))
        return (False, str(e))


def nonecatcher(f, *args, **kwargs):
    """Call the function f with the provided arguments, and return (result) on success and (None) if there is any thrown exception.  Useful for parallel processing"""
    assert callable(f)
    try:
        return f(*args, **kwargs)
    except Exception as e:
        return None

def trycatcher(f, *args, **kwargs):
    """Call the function f with the provided arguments, and return (result) on success and (None) if there is any thrown exception.  Useful for parallel processing"""
    assert callable(f)
    try:
        return f(*args, **kwargs)
    except Exception as e:
        return None

def catchif(f, *args, **kwargs):
    """Call the function f with the provided arguments, and return (result) on success and (None) if there is any thrown exception.  Useful for parallel processing.  Alias for `vipy.util.trycatcher`"""
    return trycatcher(f, *args, **kwargs)


def mergedict(d1, d2):
    """Combine keys of two dictionaries and return a dictionary deep copy.
    
    ```python
    d1 = {1:2}
    d2 = {3:4}
    d3 = mergedict(d1,d2)
    assert d3 == {1:2, 3:4}
    ```

    """
    assert isinstance(d1, dict) and isinstance(d2, dict)
    d = copy.deepcopy(d1)
    d.update(d2)
    return d


def hascache():
    """Is the VIPY_CACHE environment variable set?"""
    return 'VIPY_CACHE' in os.environ

def cache():
    """If the VIPY_CACHE environment variable set, return it otherwise return tempdir()"""
    return remkdir(os.environ['VIPY_CACHE']) if hascache() else tempdir()

def tocache(filename):
    """If the VIPY_CACHE environment variable is set, then return the filename=/path/to/file.ext in the cache as VIPY_CACHE/file.ext.  Otherwise, return the file in the system temp"""
    return os.path.join(remkdir(os.environ['VIPY_CACHE']) if hascache() else tempdir(), filetail(filename))

def seconds_to_MMSS_colon_notation(sec):
    """Convert integer seconds into MM:SS colon format.  If sec=121, then return '02:01'. """
    assert isinstance(sec, int) and sec <= 99*60 + 59 and sec >= 0
    return '%02d:%02d' % (int(sec/60.0), sec % 60)

def try_import(package, pipname=None, message=None):
    """Show a helpful error message for missing optional packages"""
    try:
        importlib.import_module(package)
    except:
        if message is not None:
            raise ImportError(message)
        else:
            raise ImportError('Optional package "%s" not installed -  Run "pip install %s" or "pip install vipy[all]" ' % (package, package if pipname is None else pipname))


def findyaml(basedir):
    """Return a list of absolute paths to yaml files recursively discovered by walking the directory tree rooted at basedir"""
    return [str(path.resolve()) for path in pathlib.Path(basedir).rglob('*.yml')]


def findpkl(basedir):
    """Return a list of absolute paths to pkl files recursively discovered by walking the directory tree rooted at basedir"""
    return [str(path.resolve()) for path in pathlib.Path(basedir).rglob('*.pkl')]

def findpklbz2(basedir):
    """Return a list of absolute paths to .pkl.bz2 files recursively discovered by walking the directory tree rooted at basedir"""
    return [str(path.resolve()) for path in pathlib.Path(basedir).rglob('*.pkl.bz2')]

def findpdf(basedir):
    """Return a list of absolute paths to pdf files recursively discovered by walking the directory tree rooted at basedir"""
    return [str(path.resolve()) for path in pathlib.Path(basedir).rglob('*.pdf')]

def findpng(basedir):
    """Return a list of absolute paths to png files recursively discovered by walking the directory tree rooted at basedir"""
    return [str(path.resolve()) for path in pathlib.Path(basedir).rglob('*.png')]

def findjson(basedir):
    """Return a list of absolute paths to json files recursively discovered by walking the directory tree rooted at basedir"""
    return [str(path.resolve()) for path in pathlib.Path(basedir).rglob('*.json')]

def findtar(basedir):
    """Return a list of absolute paths to tar files recursively discovered by walking the directory tree rooted at basedir"""
    return [str(path.resolve()) for path in pathlib.Path(basedir).rglob('*.tar')]

def findimage(basedir):
    """Return a list of absolute paths to image files recursively discovered by walking the directory tree rooted at basedir"""
    return [str(path.resolve()) for path in pathlib.Path(basedir).rglob('*') if isimage(str(path.resolve()))]

def findimages(basedir):
    """Alias for `vipy.util.findimage`"""
    return findimage(basedir)

def findvideo(basedir):
    """Return a list of absolute paths to video files recursively discovered by walking the directory tree rooted at basedir"""
    return [str(path.resolve()) for path in pathlib.Path(basedir).rglob('*') if isvideo(str(path.resolve()))]

def findwebp(basedir):
    """Return a list of absolute paths to video files recursively discovered by walking the directory tree rooted at basedir"""
    return [str(path.resolve()) for path in pathlib.Path(basedir).rglob('*') if iswebp(str(path.resolve()))]

def findvideos(basedir):
    """Alias for `vipy.util.findvideo`"""
    return findvideo(basedir)

def findloadable(basedir):
    """Return a list of absolute paths to any archive file loadable by `vipy.load` (*.pkl, *.json, *.pkl.bz2).  Recursively search starting from basedir"""
    return findpkl(basedir) + findjson(basedir) + findpklbz2(basedir)

def readyaml(yamlfile):
    """Read a yaml file and return a parsed dictionary, this is slow for large yaml files"""
    try_import('yaml', 'pyyaml')
    import yaml
    with open(yamlfile, 'r') as f:
        return yaml.load(f.read(), Loader=yaml.Loader)  # yaml.CLoader is faster, but not installed via pip


def count_images_in_subdirectories(indir):
    """Count the total number of images in indir/subdir1, indir/subdir2, go down only one level and no further..."""
    num_files = 0
    for d in dirlist(outdir):
        num_files += len(imlist(d))
    return num_files


def keymax(d):
    """Return key in dictionary containing maximum value"""
    vmax = max(d.values())
    for (k, v) in d.items():
        if v == vmax:
            return k

def keymin(d):
    """Return key in dictionary containing minimum value"""
    vmin = min(d.values())
    for (k, v) in d.items():
        if v == vmin:
            return k


def isjsonfile(filename):
    return isinstance(filename, str) and len(filename) > 5 and filename[-5:] == '.json'


def writejson(d, outfile):
    with open(outfile, 'w') as f:
        json.dump(d, f)
    return outfile


def readjson(jsonfile, strict=True):
    """Read jsonfile=/path/to/file.json and return the json parsed object, issue warning if jsonfile does not have .json extension and strict=True"""
    if not isjsonfile(jsonfile) and strict:
        warnings.warn('Attempting to read JSON file "%s" without .json extension' % jsonfile)
    with open(jsonfile) as f:
        data = json.loads(f.read())
    return data


def groupby(inset, keyfunc):
    """groupby on unsorted inset"""
    return itertools_groupby(sorted(inset, key=keyfunc), keyfunc)


def vipy_groupby(inset, keyfunc):
    """groupby on unsorted inset"""
    return groupby(inset, keyfunc)


def groupbyasdict(togroup, keyfunc):
    """Return dictionary of keys and lists from groupby on unsorted inset, where keyfunc is a lambda function on elements in inset
    
    Args:
        togroup: a list of elements to group
        keyfunc:  a lambda function to operate on elemenets of togroup such that the value returned from the lambda is the equality key for grouping

    Returns:
        A dictionary with unique keys returned from keyfunc, and values are lists of elements in togroup with the same key

    """
    return {k: list(v) for (k, v) in groupby(togroup, keyfunc)}

def countby(inset, keyfunc=lambda x: x):
    """Return dictionary of keys and group sizes for a grouping of the input list by keyfunc lambda function, sorted by increasing count""" 
    return {k:v for (k,v) in sorted({k:len(v) for (k,v) in groupbyasdict(inset, keyfunc).items()}.items(), key=lambda x: x[1])}

def most_frequent(inset, topk=1):
    """Return the most frequent element as determined by element equality"""
    ranked = list(countby(inset).keys())
    return ranked[-topk:] if topk is not None else ranked

def countbyasdict(inset, keyfunc):
    """Alias for `vipy.util.countby`"""
    return countby(inset, keyfunc)

def softmax(x, temperature=1.0):
    """Row-wise softmax"""
    assert x.ndim == 2
    z = np.exp((x - np.max(x, axis=1).reshape(x.shape[0], 1)) / temperature)
    return z / np.sum(z, axis=1).reshape(x.shape[0], 1)


def permutelist(inlist, deterministic=False, seed=42):
    """randomly permute list order.  Permutation is deterministic (same permutation on multiple calls) if specified"""
    if deterministic:
        np.random.seed(seed)  # deterministic        
    outlist = [inlist[k] for k in np.random.permutation(list(range(0, len(inlist))))]
    if deterministic:
        np.random.seed()  # re-init randomness
    return outlist


def flatlist(inlist):
    """Convert list of tuples into a list expanded by concatenating tuples.  If the input is already flat, return it unchanged."""
    return [x for r in inlist for x in (r if (isinstance(r, list) or isinstance(r, tuple)) else (r,))]


def rmdir(indir):
    """Recursively remove directory and all contents (if the directory exists)"""
    if os.path.exists(indir) and os.path.isdir(indir):
        shutil.rmtree(indir)
    return indir

def dividelist(inlist, fractions):
    """Divide inlist into a list of lists such that the size of each sublist is the requseted fraction of the original list. 

       This operation is deterministic and generates the same division in multiple calls.
       
    Args:
        inlist: [list]
        fractions: [tuple] such as (0.1, 0.7, 0.2)   An iterable of fractions that must be non-negative and sum to one
    """
    assert all([f >= 0 and f <=1 for f in fractions])
    assert np.sum(fractions) == 1
    assert len(inlist) >= len(fractions)
    N = np.int32(np.maximum(0, np.ceil(len(inlist)*np.array(fractions))))
    outlist = []
    for n in N:
        outlist.append(inlist[0:n])
        inlist = inlist[n:]
    return outlist


def pairwise(iterable, prepad=False, postpad=False, padval=None):
    """Equivalent to python-3.10 itertools.pairwise. 
    
    >>> pairwise('ABCD') --> (A,B), (B,C), (C,D)
    >>> pairwise('ABCD', prepad=True, padval=0) --> (0,A), (A,B), (B,C), (C,D)
    >>> pairwise('ABCD', postpad=True) --> (A,B), (B,C), (C,D), (D,None)
    >>> pairwise([(1,1),(2,2)], prepad=True, postpad=True, padval=(None,None)) --> [((None, None), (1, 1)), ((1, 1), (2, 2)), ((2, 2), (None, None))]
    """
    
    a, b = tee(iterable, 2)
    if prepad:
        a = chain([padval], a)
    else:
        b0 = next(b, None)
    if postpad:
        b = chain(b, [padval])
    return zip(a, b)


def chunklist(inlist, num_chunks):
    """Convert list into a list of lists of length num_chunks, such that each element is a list containing a sequential chunk of the original list.
    
    ```python
    (A,B,C) = vipy.util.chunklist(inlist, num_chunks=3)
    assert len(A) == len(inlist) // 3
    ```

    .. note::  The last chunk will be larger for ragged chunks
    """
    (m, n) = (num_chunks, int(np.ceil(float(len(inlist)) / float(num_chunks))))
    return [inlist[i * n:min(i * n + n, len(inlist))] for i in range(0, m)]


def chunklistbysize(inlist, size_per_chunk):
    """Convert list into a list of lists such that each element is a list
    containing a sequential chunk of the original list of length
    size_per_chunk"""
    assert size_per_chunk >= 1
    return [inlist[i:i+size_per_chunk] for i in range(0,len(inlist),size_per_chunk)]

def chunklistWithOverlap(inlist, size_per_chunk, overlap_per_chunk):
    """Convert list into a list of lists such that each element is a list
    containing a sequential chunk of the original list of length
    size_per_chunk"""
    assert size_per_chunk >= 1 and overlap_per_chunk >= 0 and size_per_chunk > overlap_per_chunk
    return [inlist[i-size_per_chunk:i] for i in range(size_per_chunk, len(inlist)+(size_per_chunk-overlap_per_chunk), size_per_chunk - overlap_per_chunk)]


def chunklistwithoverlap(inlist, size_per_chunk, overlap_per_chunk):
    """Alias for chunklistWithOverlap"""
    return chunklistWithOverlap(inlist, size_per_chunk, overlap_per_chunk)

def imwritejet(img, imfile=None):
    """Write a grayscale numpy image as a jet colormapped image to the
    given file"""
    if imfile is None:
        imfile = temppng()

    if isnumpy(img):
        if img.ndim == 2:
            cm = plt.get_cmap('gist_rainbow')
            PIL.Image.fromarray(np.uint8(255 * cm(img)[:,:,:3])).save(os.path.expanduser(imfile))
        else:
            raise ValueError('Input must be a 2D numpy array')
    else:
        raise ValueError('Input must be numpy array')
    return imfile


def isuint8(img):
    return isnumpy(img) and img.dtype == np.dtype('uint8')

def isnumber(x):
    """Is the input a python type of a number or a string containing a number?"""
    return isinstance(x, (int, float)) or (isnumpy(x) and np.isscalar(x)) or (isstring(x) and isfloat(x))


def isfloat(x):
    """Is the input a float or a string that can be converted to float?"""
    try:
        float(x)
        return True
    except ValueError:
        return False


def imwritegray(img, imfile=None):
    """Write a floating point grayscale numpy image in [0,1] as [0,255] grayscale"""
    if imfile is None:
        imfile = temppng()
    if isnumpy(img):
        if img.dtype == np.dtype('uint8'):
            # Assume that uint8 is in the range [0,255]
            PIL.Image.fromarray(img).save(os.path.expanduser(imfile))
        elif img.dtype == np.dtype('float32'):
            # Convert [0, 1.0] to uint8 [0,255]
            PIL.Image.fromarray(np.uint8(img * 255.0)).save(os.path.expanduser(imfile))
        else:
            raise ValueError('Unsupported datatype - '
                             'Numpy array must be uint8 or float32')
    else:
        raise ValueError('Input must be numpy array')
    return imfile


def imwrite(img, imfile=None, writeas=None):
    """Write a floating point 2D numpy image as jet or gray, 3D numpy as
    rgb or bgr"""
    if imfile is None:
        imfile = temppng()
    if not isnumpy(img):
        raise ValueError('image must by numpy object')
    if writeas is None:
        if img.ndim == 2:
            writeas = 'gray'
        else:
            writeas = 'bgr'

    imfile = os.path.expanduser(imfile)
    if writeas in ['jet']:
        imwritejet(img, imfile)
    elif writeas in ['gray']:
        imwritegray(img, imfile)
    elif writeas in ['rgb']:
        if img.ndim != 3:
            raise ValueError('numpy array must be 3D')
        if img.dtype == np.dtype('uint8'):
            PIL.Image.fromarray(rgb2bgr(img)).save(imfile)  # convert to BGR
        elif img.dtype == np.dtype('float32'):
            # convert to uint8 then BGR
            PIL.Image.fromarray(rgb2bgr(np.uint8(255.0 * img))).save(imfile)
    elif writeas in ['bgr']:
        if img.ndim != 3:
            raise ValueError('numpy array must be 3D')
        if img.dtype == np.dtype('uint8'):
            PIL.Image.fromarray(img).save(imfile)  # convert to BGR
        elif img.dtype == np.dtype('float32'):
            # convert to uint8 then BGR
            PIL.Image.fromarray(np.uint8(255.0 * img)).save(imfile)
    else:
        raise ValueError('unsupported writeas')

    return imfile


def print_and_return(x):
    print(x)
    return x


def savetemp(img):
    f = '/tmp/%s.png' % uuid.uuid1().hex
    PIL.Image.fromarray(img.astype(np.uint8)).save(f)
    return f


def gray2jet(img):
    """[0,1] grayscale to [0.255] RGB"""
    import matplotlib.pyplot as plt
    jet = plt.get_cmap('jet')
    return np.uint8(255.0 * jet(img)[:, :, 0:3])


def jet(n, bgr=False):
    """jet colormap"""
    from matplotlib import cm
    cmap = cm.get_cmap('jet', n)
    rgb = np.uint8(255 * cmap(np.arange(n)))
    return rgb if bgr is False else np.fliplr(rgb)


def is_email_address(email):
    """Is the provided string an email address?"""
    regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
    return re.fullmatch(regex, email) is not None


def is_hiddenfile(filename):
    """Does the filename start with a period?"""
    return filename[0] == '.'


def seq(start, stop, step=1):
    """Equivalent to matlab [start:step:stop]"""
    n = int(round((stop - start) / float(step)))
    if n > 1:
        return([start + step * i for i in range(n + 1)])
    else:
        return([])


def loadh5(filename):
    """Load an HDF5 file"""
    if ishdf5(filename):
        try_import('h5py'); import h5py
        f = h5py.File(filename, 'r')
        obj = f[filebase(filename)].value  # FIXME: lazy evaluation?
        return obj
    else:
        raise ValueError('Invalid HDF5 file "%s" ' % filename)



def loadmat73(matfile, keys=None):
    """Matlab 7.3 format, keys should be a list of keys to access HDF5
    file as f[key1][key2]...  Returned as numpy array"""
    try_import('h5py'); import h5py
    f = h5py.File(matfile, 'r')
    if keys is None:
        return f
    else:
        for k in keys:
            f = f[k]
        return np.array(f)


def take(inlist, k):
    """Take k elements at random from inlist"""
    return [inlist[i] for i in np.random.permutation(range(len(inlist)))[0:k]] if len(inlist)>k else inlist

def takeone(inlist):
    """Take one element at random from inlist or return None if empty"""
    return take(list(inlist), k=1)[0] if len(inlist)>=1 else None

def takelast(inlist):
    """Take last element from inlist or return None if empty"""
    return tolist(inlist)[-1] if len(tolist(inlist))>=1 else None

def tryload(infile, abspath=False):
    """Attempt to load a pkl file, and return the value if successful and None if not"""
    try:
        return load(infile, abspath=abspath)
    except:
        return None

def canload(infile):
    """Attempt to load an archive file, and return true if it can be successfully loaded, otherwise False"""
    try:
        load(infile, abspath=True)
        return True
    except:
        return False




def repath(v, srcpath, dstpath):
    """Change the filename with prefix srcpath to dstpath, for any element in v that supports the filename() api"""
    if not islist(v) and (hasattr(v, 'filename') and hasattr(v, 'clone')):
        vc = v.filename( v.filename().replace(os.path.normpath(srcpath), os.path.normpath(dstpath))) if v.filename() is not None else v
    elif islist(v) and all([(hasattr(vv, 'filename') and hasattr(vv, 'clone')) for vv in v]):
        vc = [vv.filename( vv.filename().replace(os.path.normpath(srcpath), os.path.normpath(dstpath))) if vv.filename() is not None else vv for vv in v ]
    elif isstring(v):
        vc = v.replace(os.path.normpath(srcpath), os.path.normpath(dstpath))
    else:
        raise ValueError('Input must be a singleton or list of vipy.image.Image() or vipy.video.Video() objects, not type "%s"' % (str(type(v))))
    return vc
    

def scpsave(V, username=None):
    """Save an archive file to load via SCP.

    Use case:

    - This archive format is useful to allow access to videos and images that are accessible behind a remote server for which you have access via SSH key-based authentication.
    - You create this archive on the remote server, and all vipy objects are replaced with references to remote media.
    - Every video or image is replaced with a URL of the format 'scp://USER@HOST:/path/to.mp4'.  
    - Vipy will use your SSH keys to SCP these media files from USER@HOST on demand, so that the videos are cached for you on your local machine when you need them.
    - This is useful for transparently visualizing large datasets that are hidden behind an SSH-only accessible server

    Usage:
    
    ```python
    outfile = vipy.util.scpsave([vipy.video.Video(filename='/path/to.mp4)])  # run on remote machine that you have SSH key access
    V = vipy.util.scpload(outfile)  # run on local machine that has SSH key access to remote machine
    V[0].load()  # this will SCP the videos from 'scp:///path/to.mp4' to $VIPY_CACHE/to.mp4 transparently and on demand
    ```

    Args:
        V: [vipy objects] A list of vipy objects or `vipy.dataset.Dataset`
        username: [str] Your username on the remote machine to select the proper SSH key

    Returns:
        A temp archive file stored on the remote machine that will be downloaded and loaded via SCP, such that each element in the list will be fetched via scp when pixels are loaded.

    """
    
    import vipy.image
    import vipy.video

    if isinstance(V, vipy.dataset.Dataset) and V._isvipy():
        v = V.localmap(lambda v: v.clone().url('scp://%s%s:%s' % (('%s@' % username) if username is not None else '', socket.gethostname(), v.filename())).nofilename())
    elif (isinstance(V, vipy.image.Image) or isinstance(V, vipy.video.Video)) and V.hasfilename():        
        v = V.clone().url('scp://%s%s:%s' % (('%s@' % username) if username is not None else '', socket.gethostname(), V.filename())).nofilename()
    elif islist(V) and all([isinstance(v, vipy.image.Image) or isinstance(v, vipy.video.Video) for v in V]):
        v = [v.clone().url('scp://%s%s:%s' % (('%s@' % username) if username is not None else '', socket.gethostname(), v.abspath().filename())).nofilename() for v in V]
    else:
        v = V # no vipy objects

    pklfile = 'scp://%s%s:%s' % (('%s@' % username) if username is not None else '', socket.gethostname(), save(v, temppkl()))
    cmd = "V = vipy.util.scpload('%s')" % pklfile
    print('[vipy.util.scpsave]: On a local machine where you have public key ssh access to this remote machine run:\n>>> %s\n' % cmd)
    return pklfile


def scpload(url):
    """Load an archive file saved using `vipy.util.scpsave`"""
    import vipy.downloader
    return load(vipy.downloader.scp(url, templike(url)))


def load_opencv_yaml(yamlfile):
    """Load a numpy array from YAML file exported from OpenCV"""
    return np.squeeze(np.array(cv.Load(yamlfile)))


def matrix_to_opencv_yaml(yamlfile, mtxlist, mtxname=None):
    """Write list of matrices to OpenCV yaml file format with given
    variable names"""
    def _write_matrix(f, M, mtxname):
        f.write('    %s: !!opencv-matrix\n' % mtxname)
        f.write('       rows: %d\n' % M.shape[0])
        f.write('       cols: %d\n' % (M.shape[1] if M.ndim == 2 else 1))
        f.write('       dt: f\n')
        f.write('       data: [ ')
        datastr = ''
        for (k, x) in enumerate(M.flatten()):
            datastr += '%.6e' % x
            if (k + 1 == M.size):
                f.write(datastr)
                break
            datastr += ', '
            if ((k + 1) % 4) == 0:
                f.write(datastr + '\n           ')
                datastr = ''
        f.write(']\n')

    # Write me!
    mtxlist = tolist(mtxlist)
    if mtxname is None:
        mtxname = ['mtx_%02d' % k for k in range(0, len(mtxlist))]
    with open(yamlfile, 'w') as f:
        f.write('%YAML:1.0\n')
        for (m, mname) in zip(mtxlist, mtxname):
            _write_matrix(f, m, mname)

    return yamlfile


def save_opencv_yaml(yamlfile, mat):
    """Save a numpy array to YAML file importable by OpenCV"""

    def _write_matrix(f, M):
        f.write('    mtx_01: !!opencv-matrix\n')
        f.write('       rows: %d\n' % M.shape[0])
        f.write('       cols: %d\n' % (M.shape[1] if M.ndim == 2 else 1))
        f.write('       dt: f\n')
        f.write('       data: [ ')
        datastr = ''
        for (k, x) in enumerate(M.flatten()):
            datastr += '%.6e' % x
            if (k + 1 == M.size):
                f.write(datastr)
                break
            datastr += ', '
            if ((k + 1) % 4) == 0:
                f.write(datastr + '\n           ')
                datastr = ''

        f.write(']\n')

    with open(yamlfile, 'w') as f:
        f.write('%YAML:1.0\n')
        _write_matrix(f, mat)

    return yamlfile


def tofilename(s, hyphen=True):
    """Convert arbitrary string to valid filename with underscores
    replacing invalid chars"""
    valid_chars = "-_.%s%s" % (str.ascii_letters, str.digits)
    s = str.replace(s, ' ', '_')
    if hyphen:
        s = str.replace(s, '-', '_')
    return "".join(x for x in s if x in valid_chars)


def isexe(filename):
    """Is the file an executable binary?"""
    return os.path.isfile(filename) and os.access(filename, os.X_OK)


def isinstalled(cmd):
    """Is the command is available on the path"""
    return shutil.which(cmd) is not None


def isextension(filename, ext):
    """Does the filename end with the extension ext? 
    
    ```python
    isextension('/path/to/myfile.json', 'json') == True
    isextension('/path/to/myfile.json', '.json') == True
    isextension('/path/to/myfile.json', '.pkl') == False
    ```

    """
    return filename is not None and filename.endswith(ext)

def ispkl(filename):
    """Is the file a pickle archive file"""
    return filename[-4:] == '.pkl' if isstring(filename) and len(filename) >= 4 else False

def ispklbz2(filename):
    """Is the file a pickle archive file"""
    return filename[-8:] == '.pkl.bz2' if isstring(filename) and len(filename) >= 8 else False

def ispklfile(filename):
    """Is the file a pickle archive file"""
    return ispkl(filename)

def ishtml(filename):
    """Is the file an HTMLfile"""
    return filename.lower()[-5:] == '.html'

def ispickle(filename):
    """Is the file a pickle archive file"""
    return isfile(filename) and os.path.exists(filename) and (((fileext(filename) is not None) and fileext(filename).lower() in ['.pk', '.pkl']) or (filename[-4:] == '.pkl'))


def ishdf5(path):
    """Is the file an HDF5 file?"""
    # tables.is_hdf5_file(path)
    # tables.is_pytables_file(path)
    (filename, ext) = os.path.splitext(path)
    if (ext is not None) and (len(ext) > 0) and (ext.lower() in ['.h5']):
        return True
    else:
        return False


def filebase(filename):
    """Return c for filename /a/b/c.ext
    
    .. warning:: Will return /a/b/c.d for multidot filenames wth more than two trailing dots like /a/b/c.d.e.f (e.g. /a/b/my.filename.tar.gz)
    """
    (head, tail) = os.path.split(filename)
    (base, ext) = splitext(tail)
    return base


def filepath(filename, depth=0):
    """Return /a/b/c for filename /a/b/c/d.ext, /a/b for filename /a/b/c/d.ext if depth=1, etc"""
    (head, tail) = os.path.split(filename)
    for k in range(depth):
        (head, tail) = os.path.split(head)           
    return head


def delpath(indir, filename):
    """Return c/d.ext for filename /a/b/c/d.ext and indir /a/b"""
    assert indir in filename, 'Path "%s" not found in filename "%s"' % (indir, filename)
    indir = os.path.join(indir, '')  # /a/b -> /a/b/
    return filename.split(indir)[1]

    
def newpath(filename, newdir):
    """Return /d/e/c.ext for filename /a/b/c.ext and newdir /d/e/"""
    (head, tail) = os.path.split(filename)
    return os.path.join(newdir, tail)

def newprefix(filename, newprefix, depth=0):
    """Return /a/b/c/h/i.ext for filename /f/g/h/i.ext and prefix /a/b/c and depth=1"""
    p = filepath(filename, depth=depth)
    return os.path.normpath(filename.replace(p, newprefix))

def newpathdir(filename, olddir, newdir, n=1):
    """Return /a/b/n/d/e.ext for filename=/a/b/c/d/e.ext, olddir=c, newdir=n"""
    p = pathlib.PurePath(filename)
    assert sum([d == olddir for d in p.parts]) == n, "Path must have exactly %s directory matches" % n
    return os.path.join(*[d.replace(olddir, newdir) for d in list(p.parts)])


def newpathroot(filename, newroot):
    """Return /r/b/c.ext for filename /a/b/c.ext and new root directory r"""
    p = pathlib.PurePath(filename)
    path = list(p.parts)    
    if len(p.root) == 0:
        path[0] = newroot
    else:
        path[1] = newroot
    return os.path.join(*path)

def topath(filename, newdir):
    """Alias for `vipy.util.newpath`"""
    return newpath(filename, newdir)


def filefull(f):
    """Return /a/b/c for filename /a/b/c.ext"""
    ext = fileext(f, multidot=True, withdot=True)
    return f.replace(ext, '') if ext is not None else f


def filetail(filename):
    """Return c.ext for filename /a/b/c.ext"""
    (head, tail) = os.path.split(filename)
    return tail


def matread(txtfile, delimiter=' '):
    """Whitespace separated values defining columns, lines define rows.
    Return numpy array"""
    with open(txtfile, 'rb') as csvfile:
        M = [np.float32(row.split(delimiter)) for row in csvfile]
    return np.array(M)


def imlist(imdir):
    """return list of images with absolute path in a directory"""
    return [os.path.abspath(os.path.join(imdir, item))
            for item in os.listdir(imdir)
            if (isimg(item) and not is_hiddenfile(item))]


def videolist(videodir):
    """return list of videos with absolute path in a directory"""
    return [os.path.abspath(os.path.join(videodir, item))
            for item in os.listdir(videodir)
            if (isvideo(item) and not is_hiddenfile(item))]


def dirlist(indir):
    """return list of absolute paths to subdirectories in a directory"""
    return [os.path.abspath(os.path.join(indir, item))
            for item in os.listdir(indir)
            if (os.path.isdir(os.path.join(indir, item)) and
                not is_hiddenfile(item))]


def dirlist_sorted_bycreation(indir):
    """Sort the directory list from newest first to oldest last by creation date"""
    return sorted(dirlist(indir), key=lambda d: os.stat(d).st_ctime, reverse=True)


def extlist(indir, ext):
    """return list of files with absolute path in a directory that have
    the provided extension (with the prepended dot, ext='.mp4')"""
    return [os.path.abspath(os.path.join(indir, item))
            for item in os.listdir(indir)
            if fileext(item) is not None
            and (fileext(item).lower() == ext.lower())]

def listext(indir, ext):
    """Alias for extlist"""
    return extlist(indir, ext)

def jsonlist(indir):
    """return list of fJSON iles with absolute path in a directory"""
    #return extlist(indir, ext='.json')  # FIXME: broken.for.wonky.filenames.with.dots.json
    return [os.path.abspath(os.path.join(indir, item))
            for item in os.listdir(indir)
            if len(item) > 5 and item[-5:] == '.json']

def listjson(indir):
    """Alias for jsonlist"""
    return jsonlist(indir)

def writelist(mylist, outfile, mode='w'):
    """Write list of strings to an output file with each row an element of
    the list"""
    outfile = os.path.abspath(os.path.expanduser(outfile))
    with open(outfile, mode) as f:
        for s in mylist:
            f.write(str(s) + '\n')
    return(outfile)


def readlist(infile):
    """Read each row of file as an element of the list"""
    with open(infile, 'r') as f:
        list_of_rows = [r.strip() for r in f.readlines()]
    return list_of_rows


def readtxt(infile):
    """Read a text file one string per row"""
    return readlist(infile)


def writecsv(list_of_tuples, outfile=None, mode='w', separator=',', header=None, comment='# '):
    """Write list of tuples to an output csv file with each list element on a row and tuple elements separated by commas.

    Examples:
    ```python
    vipy.util.writecsv([(1,2,3), (4,5,6)], '/tmp/out.csv')
    vipy.util.writecsv([(1,2,3), (4,5,6)], '/tmp/out.csv', separator=';'))
    vipy.util.writecsv([(1,2,3), (4,5,6)], '/tmp/out.csv', header=('h1','h2','h3'))
    ```

    Args:
        list_of_tuples: a list of tuples each tuple is a row
        outfile: the csv file output
        mode: 'w' for overwrite, 'a' for append
        separator: a string specifying the separator between columns.  defaults to ','
        header: a tuple containing strings to be appended to the first row of the csv file
        comment:  the comment symbol to be prepended to the header row 

    Returns:
        the outfile path
    """
    
    list_of_tuples = list_of_tuples if not isnumpy(list_of_tuples) else list_of_tuples.tolist()
    list_of_tuples = list_of_tuples if header is None else [tuple([h if k>0 else comment+h for (k,h) in enumerate(header)])]+list_of_tuples  # prepend header with comment symbol
    outfile = os.path.abspath(os.path.expanduser(outfile)) if outfile is not None else tempcsv()
    with open(outfile, mode) as f:
        for u in list_of_tuples:
            n = len(u)
            for (k, v) in enumerate(u):
                if (k + 1) < n:
                    f.write(str(v) + separator)
                else:
                    f.write(str(v) + '\n')
    return(outfile)


def readcsv(infile, separator=',', ignoreheader=False, comment=None, ignore_header=False):
    """Read a csv file into a list of lists, ignore any rows prepended with comment symbol, ignore first row if ignoreheader=True

    Args:
        infile: the csv file input
        separator: a string specifying the separator between columns.  defaults to ','
        ignoreheader: if true, ignore the first row of the csv file
        ignore_header: if true, ignore the first row of the csv file (argument synonym)
        comment:  if provided, ignore all rows with this comment symbol prepended

    Returns:
        a list of lists, each list element containing a list of elements in the corresponding line of the csv file, parsed by separator
    """

    with open(infile, 'r') as f:
        list_of_rows = [[x.strip() for x in r.split(separator)]
                        for r in f.readlines()]
    list_of_rows = list_of_rows if (len(list_of_rows)==0 or not (ignoreheader or ignore_header)) else list_of_rows[1:]
    list_of_rows = list_of_rows if comment is None else [r for r in list_of_rows if len(r)==0 or r[0][0] != comment]
    return list_of_rows


def readcsvwithheader(infile, separator=','):
    """Read a csv file into a list of lists"""
    with open(infile, 'r') as f:
        list_of_rows = [[x.strip() for x in r.split(separator)]
                        for r in f.readlines()]
    header_dict = dict()
    for i in range(len(list_of_rows[0])):
        header_dict[list_of_rows[0][i]] = i
    return list_of_rows[1:], header_dict


def imsavelist(imdir, outfile):
    """Write out all images in a directory to a provided file with each
    line containing absolute path to image"""
    return writelist(imlist(imdir), outfile)


def csvlist(imdir):
    """Return a list of absolute paths of *.csv files in current directory"""
    return [os.path.join(imdir, item) for item in os.listdir(imdir)
            if iscsv(item)]


def pklist(indir):
    """Return a list of absolute paths of *.pk files in current directory"""
    return listpkl(indir)

def listpkl(indir):
    """Return a list of absolute paths of *.pk files in current directory"""
    return [os.path.join(indir, item) for item in os.listdir(indir)
            if ispickle(os.path.join(indir, item))]


def txtlist(imdir):
    """Return a list of absolute paths of *.txt files in current directory"""
    return [os.path.join(imdir, item) for item in os.listdir(imdir)
            if istextfile(item) and not is_hiddenfile(item)]


def imlistidx(filelist, idx_in_filename):
    """Return index in list of filename containing index number"""
    return [i for (i, item) in enumerate(filelist)
            if (item.find('%d' % idx_in_filename) > 0)]


def mat2gray(img, min=None, max=None):
    """Convert numpy array to float32 with 1.0=max and 0=min"""
    immin = np.min(img) if min is None else min
    immax = np.max(img) if max is None else max
    if (immax - immin) > 0:
        return (np.float32(img) - immin) / (immax - immin)
    else:
        return img


def mdlist(m, n):
    """Preallocate 2D list of size MxN"""
    return [[None] * n for i in range(m)]


def isurl(path):
    """Is a path a URL?  It requires a url scheme and url netloc without any common unallowed characters"""
    try:
        url = urlparse(path)
        return not any([c in path for c in ('>','<','"')]) and bool(url.scheme) and bool(url.netloc)
    except:
        return False

def shortuuid(n=16):
    """Generate a short UUID with n hex digits"""
    return hashlib.sha256(uuid.uuid1().hex.encode('utf-8')).hexdigest()[0:n] 

def stringhash(s, n=16):
    """Generate a repeatable hash with n characters for a string s"""
    return hashlib.sha256(s.encode('utf-8')).hexdigest()[0:n]

def isimageurl(path):
    """Is a path a URL with image extension?"""
    return path is not None and isurl(path) and isimg(path)


def isvideourl(path):
    """Is a path a URL with video extension?"""
    return isurl(path) and isvideo(path)


def isS3url(path):
    """Is a path a URL for an S3 object?"""
    return isurl(path) and urlparse(path).scheme == 's3'


def isyoutubeurl(path):
    """Is a path a youtube URL?"""
    return isurl(path) and ('youtube.com' in path or 'youtu.be' in path)

def isRTSPurl(path):
    return isurl(path) and path.startswith('rtsp://')

def isRTMPurl(path):
    return isurl(path) and (path.startswith('rtmp://') or path.startswith('rtmps://'))


def islist(x):
    """Is an object a python list"""
    return type(x) is list


def islistoflists(x):
    """Is an object a python list of lists x=[[1,2], [3,4]]"""
    return type(x) is list and type(x[0]) is list


def istupleoftuples(x):
    """Is an object a python list of lists x=[[1,2], [3,4]]"""
    return type(x) is tuple and type(x[0]) is tuple


def isimageobject(x):
    """Is an object a vipy.image class Image, ImageCategory, ImageDetection?"""
    return (str(type(x)) in ["<class 'vipy.image.Image'>",
                             "<class 'vipy.image.ImageCategory'>",
                             "<class 'vipy.image.ImageDetection'>"])


def isvideotype(x):
    """Is an object a vipy.video class Video, VideoCategory, Scene?"""
    return (str(type(x)) in ["<class 'vipy.video.Video'>",
                             "<class 'vipy.video.VideoCategory'>",
                             "<class 'vipy.video.Scene'>"])

def isvideoobject(x):
    return isvideotype(x)


def isvipyobject(x):
    import vipy.image
    import vipy.video
    return ((isinstance(x, vipy.image.Image) or isinstance(x, vipy.video.Video)) 
            or (islist(x) or istuple(x) and all([isinstance(v, vipy.image.Image) or isinstance(v, vipy.video.Video) for v in x]))
            or (isinstance(x, dict) and all([isinstance(v, vipy.image.Image) or isinstance(v, vipy.video.Video) for (k,v) in x.items()])))


def istuple(x):
    """Is an object a python tuple?"""
    return isinstance(x, tuple)


def tolist(x):
    """Convert a python tuple or singleton object to a list if not already a list """
    if isinstance(x, list):
        return x
    elif isinstance(x, tuple):        
        return list(x)
    elif isinstance(x, set):        
        return list(x)
    else:
        return [x]

def toset(x):
    """Convert a python iterable to a set of not already a set"""
    if isinstance(x, set):        
        return x    
    elif isinstance(x, list) or isinstance(x, tuple):
        return set(x)
    else:
        return set([x])
    
    
def tolist_or_singleton(x):
    """Return list(x) if length of iterator x is not equal to one, else return x or None.  This is useful to return single elements instead of single element lists."""
    y = tolist(x)
    return y if len(y)>1 else (y[0] if len(y)==1 else None)


def isimg(path):
    """Is an object an image with a supported image extension ['.jpg','.jpeg','.png','.tif','.tiff','.pgm','.ppm','.gif','.bmp']?"""    
    if path is not None and os.path.splitext(path)[1].lower() in ['.jpg', '.jpeg', '.png', '.tif', '.tiff', '.pgm', '.ppm', '.gif', '.bmp']:
        return True
    else:
        return False

def isimage(path):
    """Alias for `vipy.util.isimg`"""
    return isimg(path)
    
def isvideofile(path):
    """Alias for `vipy.util.isvideo`"""
    return isvideo(path)

def isimgfile(path):
    """Alias for `vipy.util.isimg`"""
    return isimg(path)


def isimagefile(path):
    """Alias for `vipy.util.isimg`"""
    return isimg(path)


def isjpeg(path):
    """is the file a .jpg or .jpeg extension?"""
    return hasextension(path) and fileext(path).lower() == '.jpg' or fileext(path).lower() == '.jpeg'

def iswebp(path):
    """is the file a .webp extension?"""
    return hasextension(path) and fileext(path).lower() == '.webp'

def ispng(path):
    """is the file a .png or .apng extension?"""
    return hasextension(path) and (fileext(path).lower() == '.png' or fileext(path).lower() == '.apng')

def isgif(path):
    """is the file a .gif extension?"""
    return hasextension(path) and fileext(path).lower() == '.gif'

def isjpg(path):
    """Alias for `vipy.util.isjpeg`"""
    return isjpeg(path)


def iscsv(path):
    """Is a file a CSV file extension?"""

    (filename, ext) = (os.path.splitext(path) if path is not None else ('',''))
    if ext.lower() in ['.csv', '.CSV']:
        return True
    else:
        return False

def isvideo(path):
    """Is a filename in path a video with a known video extension ['.avi','.mp4','.mov','.wmv','.mpg', 'mkv', 'webm', '3gp']?"""
    if path is not None and os.path.splitext(path)[1].lower() in ['.avi','.mp4','.mov','.wmv','.mpg', '.mkv', '.webm', '.3gp']:
        return True
    else:
        return False


def isnumpy(obj):
    """Is a python object a numpy object?"""
    return ('numpy' in str(type(obj)))

def isnumpyarray(obj):
    """Is a python object a numpy array?"""
    return isnumpy(obj) and 'numpy.ndarray' in str(type(obj))


def istextfile(path):
    """Is the given file a text file?"""
    (filename, ext) = (os.path.splitext(path) if path is not None else ('',''))
    if ext.lower() in ['.txt'] and (filename[0] != '.'):
        return True
    else:
        return False


def isxml(path):
    """Is the given file an xml file?"""
    (filename, ext) = os.path.splitext(path)
    if ext.lower() in ['.xml']:
        return True
    else:
        return False


def bgr2gray(im_bgr):
    """Wrapper for numpy uint8 BGR image to uint8 numpy grayscale"""
    return np.array(PIL.Image.fromarray(im_bgr).convert('L'))


def gray2bgr(im_gray):
    """Wrapper for numpy float32 gray image to uint8 numpy BGR"""
    return np.array(PIL.Image.fromarray(im_gray, mode='F').convert('RGB'))[:,:,::-1]  # Gray -> RGB -> BGR


def gray2rgb(im_gray):
    return bgr2rgb(gray2bgr(im_gray))


def bgr2rgb(im_bgr):
    """Wrapper for numpy BGR uint8 to numpy RGB uint8"""
    return np.array(im_bgr)[:,:,::-1]


def rgb2bgr(im_rgb):
    """same as bgr2rgb"""
    return bgr2rgb(im_rgb)


def bgr2hsv(im_bgr):
    """Convert a numpy array in BGR order to HSV"""
    return np.array(PIL.Image.fromarray(bgr2rgb(im_bgr)).convert('HSV'))  # BGR -> RGB -> HSV


def gray2hsv(im_gray):
    """Convert a numpy array in floating point single channel greyscale order to HSV"""
    return np.array(PIL.Image.fromarray(gray2rgb(im_gray)).convert('HSV'))  # Gray -> RGB -> HSV


def isarchive(filename):
    """Is filename a zip or gzip compressed tar archive?"""
    (filebase, ext) = splitext(filename)
    if (ext is not None) and (len(ext) > 0) and (ext.lower() in [
            '.egg', '.jar', '.tar', '.tar.bz2', '.tar.gz',
            '.tgz', '.tz2', '.zip', '.gz']):
        return True
    else:
        (filebase, ext) = splitext(ext[1:])
        if (ext is not None) and (len(ext) > 0) and (ext.lower() in ['.bz2']):
            return True
        else:
            return False

def istgz(filename):
    """Is the filename a .tgz or .tar.gz extension?"""
    return filename[-4:] == '.tgz' or filename[-7:] == '.tar.gz'

def istar(filename):
    """Is the filename a .tar extension?"""
    return filename[-4:] == '.tar'

def istarbz2(filename):
    """Is the filename a .bz2 or .tar.bz2 extension?"""
    return filename[-8:] == '.tar.bz2'

def tempfilename(suffix):
    """Create a temporary filename $TEMPDIR/$UUID.suffix, suffix should include the dot such as suffix='.jpg', """
    return os.path.join(tempfile.gettempdir(), '%s%s' % (shortuuid(), suffix))


def totempdir(filename):
    """Convert a filename '/patj/to/filename.ext' to '/tempdir/filename.ext'"""
    return os.path.join(tempfile.gettempdir(), filetail(filename))


def templike(filename):
    """Create a new temporary filename with the same extension as filename"""
    return tempfilename(fileext(filename))


def cached(filename):
    """Create a new filename in the cache, or tempdir if not found"""
    if 'VIPY_CACHE' in os.environ:
        return os.path.join(remkdir(os.environ['VIPY_CACHE']), filetail(filename))
    else:
        return totempdir(filename)


def tempimage(ext='jpg'):
    """Create a temporary image with the given extension"""
    if ext[0] == '.':
        ext = ext[1:]
    return tempfilename(suffix='.' + ext)


def temppng():
    """Create a temporay PNG file"""
    return tempimage('png')


def temppickle():
    """Create a temporary pickle file"""
    return tempfilename(suffix='.pkl')


def tempjpg():
    """Create a temporary JPG file in system temp directory"""
    return tempimage('jpg')


def tempMP4():
    """Create a temporary MP4 file in system temp directory"""
    return tempfilename(suffix='.mp4')

def tempWEBP():
    """Create a temporary WEBP file in system temp directory"""
    return tempfilename(suffix='.webp')


def tmpjpg():
    """Create a temporary JPG file in /tmp"""
    return '/tmp/%s.jpg' % uuid.uuid4().hex


def tempcsv():
    """Create a temporary CSV file"""
    return tempfilename(suffix='.csv')

def temphtml():
    """Create a temporary HTMLfile"""
    return tempfilename(suffix='.html')


def temppkl():
    """Create a temporary pickle file"""
    return temppickle()


def tempyaml():
    """Create a temporary YAML file"""
    return tempfilename(suffix='.yml')


def tempjson():
    """Create a temporary JSON file"""
    return tempfilename(suffix='.json')


def temppdf():
    """Create a temporary PDF file"""
    return tempfilename(suffix='.pdf')


def mktemp(ext):
    """Create a temporary file with extension .ext"""
    return tempfilename(suffix='.' + ext)


def tempdir():
    """Wrapper around tempfile, because I can never remember the syntax"""
    return tempfile.gettempdir()


def imread(imfile):
    """Wrapper for opencv imread. Note that color images are imported as
    BGR!"""
    return np.array(PIL.Image.open(imfile))[:,:,::-1]


def imrescale(im, scale):
    (height, width) = (im.shape[0], im.shape[1])
    return np.array(PIL.Image.fromarray(im).resize((int(np.round(scale * width)), int(np.round(scale * height))), PIL.Image.BILINEAR))


def imresize(im, rows, cols):
    return np.array(PIL.Image.fromarray(im).resize((rows, cols), PIL.Image.BILINEAR))


def touch(filename, mystr=''):
    """Create an empty file containing mystr"""
    f = open(filename, 'w')
    f.write(str(mystr))
    f.close()


class Stopwatch(object):
    """Return elapsed system time in seconds between calls to enter and exit"""

    def __init__(self):
        self.reset()

    def __enter__(self):
        self.start = time.time()
        self.last = self.start
        return self

    def __exit__(self, *args):
        self.end = time.time()
        self.elapsed = self.end - self.start

    def since(self, start=False):
        """Return seconds since start or last call to this method"""
        now = time.time()
        dur = now - self.start if start is True else now - self.last
        self.last = now
        return dur

    def reset(self):
        self.start = time.time()
        self.last = self.start
        return self

    def duration(self):
        """Time in seconds since last reset"""
        return time.time() - self.start

    
class Timer(object):
    """Pretty print elapsed system time in seconds between calls to enter and exit
    
    ```python 
       t = Timer():
       [some code]
       print(t)
       [some more code]
       print(t)

       with Timer():
          [some code]
    ```
       
    """
    def __enter__(self):
        self._begin = time.time()
        self._last = self._begin
        return self
        
    def __exit__(self, *args):
        print(self.__repr__())

    def __init__(self, sprintf_next=None, sprintf_first=None):
        self._sprintf_next = '[vipy.util.timer]: elapsed=%1.6fs, total=%1.6fs' if sprintf_next is None else sprintf_next
        self._sprintf_first = '[vipy.util.timer]: elapsed=%1.6fs' if sprintf_first is None else sprintf_first
        self._begin = time.time()
        self._last = self._begin
        self._laps = 0        
        try:
            self._sprintf_next % (1.0, 1.0)
            self._sprintf_first % (1.0)            
        except:
            raise ValueError('Printed display string must be a sprintf style string with one or two number variable like "Elapsed=%1.6f since=%1.6f"')
            
    def __repr__(self):
        s = str(self._sprintf_next % (time.time() - self._last, (time.time() - self._begin))) if self._laps > 0 else str(self._sprintf_first % (time.time() - self._begin))
        self._last = time.time()
        self._laps += 1
        return s

        
def isfile(path):
    """Wrapper for os.path.isfile"""
    return os.path.isfile(str(path))


def isstring(s):
    """Is an object a python string or unicode string?"""
    return isinstance(s, str)  # python3


def timestamp():
    """Return date and time string in form DDMMMYY_HHMMSS"""
    return str.upper(strftime("%d%b%y_%I%M%S%p", localtime()))

def clockstamp():
    """Datetime stamp in local timezone with second resolution with format Year-Month-Day Hour:Minute:Second"""    
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")    

def minutestamp():
    """Return date and time string in form DDMMMYY_HHMM"""
    return str.upper(strftime("%d%b%y_%I%M%p", localtime()))


def datestamp():
    """Return date and time string in form DDMMMYY"""
    return str.upper(strftime("%d%b%y", localtime()))


def remkdir(path, flush=False):
    """Create a given directory if not already exists"""
    if os.path.isdir(path) is False and len(path) > 0:
        os.makedirs(path)
    elif flush is True:
        shutil.rmtree(path)
        os.makedirs(path)
    return os.path.abspath(os.path.expanduser(path))


def rermdir(path):
    """Recursively delete a given directory (if exists), and remake it"""
    return remkdir(path, flush=True)


def premkdir(filename):
    """pre-create directory /path/to/subdir using `vipy.util.remkdir` if it does not exist for outfile=/path/to/subdir/file.ext, and return filename"""
    remkdir(filepath(filename))
    return filename


def newbase(filename, base):
    """Convert filename=/a/b/c.ext base=d -> /a/b/d.ext"""
    return os.path.join(filepath(filename), '%s.%s' % (base, fileext(filename, withdot=False)))

def toextension(filename, newext):
    """Convert filename='/path/to/myfile.ext' to /path/to/myfile.xyz, such that newext='xyz' or newext='.xyz'"""
    if '.' in newext:
        newext = newext.split('.')[-1]
    (filename, oldext) = splitext(filename)
    return filename + '.' + str(newext)

def noextension(filename, ext=None):
    """Convert filename='/path/to/myfile.ext' or filename='/path/to/myfile.ext1.ext2.ext3' to /path/to/myfile with no extension, removing the appended string past the first dot"""
    return filename.split('.')[0] if ext is None else filename.replace(ext, '')

def topkl(filename):
    """Convert filename='/path/to/myfile.ext' to /path/to/myfile.pkl"""
    return toextension(filename, '.pkl')

def splitext(filename):
    """Given /a/b/c.ext return tuple of strings ('/a/b/c', '.ext'), handling multi-dot extensions like .tar.gz"""
    (head, tail) = os.path.split(filename)
    ext = fileext(filename, multidot=True, withdot=True)
    base = tail.replace(ext,'') if ext is not None else tail
    return (os.path.join(head, base), ext)  # for consistency with splitext


def hasextension(filename):
    """Does the provided filename have a file extension (e.g. /path/to/file.ext) or not (e.g. /path/to/file)"""
    return fileext(filename) is not None


def fileext(filename, multidot=True, withdot=True):
    """Given filename /a/b/c.ext return '.ext', or /a/b/c.tar.gz return '.tar.gz'.   If multidot=False, then return '.gz'.  If withdot=False, return 'ext'.  Multidot support at most two trailing dots"""
    (head, tail) = os.path.split(filename)
    try:
        parts = str.rsplit(tail, '.', 2)
        if len(parts) == 3 and multidot:
            ext = '.%s.%s' % (parts[1], parts[2])  # .tar.gz
        elif len(parts) == 3 and not multidot:
            ext = '.%s' % (parts[2])  # .gz            
        else:
            ext = '.' + parts[1]  # .mp4

    except:
        base = tail
        ext = None
    return ext if withdot else ext[1:]

def mediaextension(filename):
    """Return '.mp4' for filename='/a/b/c.mp4'"""
    return fileext(filename, multidot=False)

def ismacosx():
    """Is the current platform MacOSX?"""
    (sysname, nodename, release, version, machine) = os.uname()
    return sysname == 'Darwin'


def islinux():
    """is the current platform Linux?"""
    (sysname, nodename, release, version, machine) = os.uname()
    return sysname == 'Linux'


def linuxversion():
    """Return linux version"""
    if islinux():
        with open('/etc/redhat-release') as f:
            v = f.readlines()
            m = re.match('[a-zA-Z ]+([0-9]+\.[0-9]+)', v[0])
            return m.groups(1)[0]
    return None


def imcrop(img, bbox):
    """Crop a 2D or 3D numpy image given a vipy.geometry.BoundingBox"""
    return img[bbox.xmin():bbox.xmax(), bbox.ymin():bbox.ymax()]


class Failed(Exception):
    """Raised when unit test fails to throw an exception"""
    pass


def string_to_pil_interpolation(interp):
    """Internal function to convert interp string to interp object"""
    assert interp in ['bilinear', 'bicubic', 'nearest'], "Invalid interp - Must be in ['bilinear', 'bicubic', 'nearest']"
    if interp == 'bilinear':
        return PIL.Image.BILINEAR
    elif interp == 'bicubic':
        return PIL.Image.BICUBIC
    elif interp == 'nearest':
        return PIL.Image.NEAREST
    else:
        raise  # should never get here

def symlink(src, dst, overwrite=False):
    """Create a symlink from src to dst, overwriting the existing symlink at dst if overwrite=True"""
    if overwrite and os.path.islink(dst):
        os.unlink(dst)
    os.symlink(src, dst)
    return dst

Functions

def bgr2gray(im_bgr)

Wrapper for numpy uint8 BGR image to uint8 numpy grayscale

Expand source code Browse git
def bgr2gray(im_bgr):
    """Wrapper for numpy uint8 BGR image to uint8 numpy grayscale"""
    return np.array(PIL.Image.fromarray(im_bgr).convert('L'))
def bgr2hsv(im_bgr)

Convert a numpy array in BGR order to HSV

Expand source code Browse git
def bgr2hsv(im_bgr):
    """Convert a numpy array in BGR order to HSV"""
    return np.array(PIL.Image.fromarray(bgr2rgb(im_bgr)).convert('HSV'))  # BGR -> RGB -> HSV
def bgr2rgb(im_bgr)

Wrapper for numpy BGR uint8 to numpy RGB uint8

Expand source code Browse git
def bgr2rgb(im_bgr):
    """Wrapper for numpy BGR uint8 to numpy RGB uint8"""
    return np.array(im_bgr)[:,:,::-1]
def cache()

If the VIPY_CACHE environment variable set, return it otherwise return tempdir()

Expand source code Browse git
def cache():
    """If the VIPY_CACHE environment variable set, return it otherwise return tempdir()"""
    return remkdir(os.environ['VIPY_CACHE']) if hascache() else tempdir()
def cached(filename)

Create a new filename in the cache, or tempdir if not found

Expand source code Browse git
def cached(filename):
    """Create a new filename in the cache, or tempdir if not found"""
    if 'VIPY_CACHE' in os.environ:
        return os.path.join(remkdir(os.environ['VIPY_CACHE']), filetail(filename))
    else:
        return totempdir(filename)
def canload(infile)

Attempt to load an archive file, and return true if it can be successfully loaded, otherwise False

Expand source code Browse git
def canload(infile):
    """Attempt to load an archive file, and return true if it can be successfully loaded, otherwise False"""
    try:
        load(infile, abspath=True)
        return True
    except:
        return False
def catcher(f, *args, **kwargs)

Call the function f with the provided arguments, and return (True, result) on success and (False, exception) if there is any thrown exception. Useful for parallel processing

Expand source code Browse git
def catcher(f, *args, **kwargs):
    """Call the function f with the provided arguments, and return (True, result) on success and (False, exception) if there is any thrown exception.  Useful for parallel processing"""
    assert callable(f)
    try:
        return (True, f(*args, **kwargs))
    except Exception as e:
        return (False, str(e))
def catchif(f, *args, **kwargs)

Call the function f with the provided arguments, and return (result) on success and (None) if there is any thrown exception. Useful for parallel processing. Alias for trycatcher()

Expand source code Browse git
def catchif(f, *args, **kwargs):
    """Call the function f with the provided arguments, and return (result) on success and (None) if there is any thrown exception.  Useful for parallel processing.  Alias for `vipy.util.trycatcher`"""
    return trycatcher(f, *args, **kwargs)
def chunklist(inlist, num_chunks)

Convert list into a list of lists of length num_chunks, such that each element is a list containing a sequential chunk of the original list.

(A,B,C) = vipy.util.chunklist(inlist, num_chunks=3)
assert len(A) == len(inlist) // 3

Note: The last chunk will be larger for ragged chunks

Expand source code Browse git
def chunklist(inlist, num_chunks):
    """Convert list into a list of lists of length num_chunks, such that each element is a list containing a sequential chunk of the original list.
    
    ```python
    (A,B,C) = vipy.util.chunklist(inlist, num_chunks=3)
    assert len(A) == len(inlist) // 3
    ```

    .. note::  The last chunk will be larger for ragged chunks
    """
    (m, n) = (num_chunks, int(np.ceil(float(len(inlist)) / float(num_chunks))))
    return [inlist[i * n:min(i * n + n, len(inlist))] for i in range(0, m)]
def chunklistWithOverlap(inlist, size_per_chunk, overlap_per_chunk)

Convert list into a list of lists such that each element is a list containing a sequential chunk of the original list of length size_per_chunk

Expand source code Browse git
def chunklistWithOverlap(inlist, size_per_chunk, overlap_per_chunk):
    """Convert list into a list of lists such that each element is a list
    containing a sequential chunk of the original list of length
    size_per_chunk"""
    assert size_per_chunk >= 1 and overlap_per_chunk >= 0 and size_per_chunk > overlap_per_chunk
    return [inlist[i-size_per_chunk:i] for i in range(size_per_chunk, len(inlist)+(size_per_chunk-overlap_per_chunk), size_per_chunk - overlap_per_chunk)]
def chunklistbysize(inlist, size_per_chunk)

Convert list into a list of lists such that each element is a list containing a sequential chunk of the original list of length size_per_chunk

Expand source code Browse git
def chunklistbysize(inlist, size_per_chunk):
    """Convert list into a list of lists such that each element is a list
    containing a sequential chunk of the original list of length
    size_per_chunk"""
    assert size_per_chunk >= 1
    return [inlist[i:i+size_per_chunk] for i in range(0,len(inlist),size_per_chunk)]
def chunklistwithoverlap(inlist, size_per_chunk, overlap_per_chunk)

Alias for chunklistWithOverlap

Expand source code Browse git
def chunklistwithoverlap(inlist, size_per_chunk, overlap_per_chunk):
    """Alias for chunklistWithOverlap"""
    return chunklistWithOverlap(inlist, size_per_chunk, overlap_per_chunk)
def class_registry()

Return a dictionary mapping str(type(obj)) to a JSON loader for all vipy objects.

This function is useful for JSON loading of vipy objects to map to the correct deserialization method.

Expand source code Browse git
def class_registry():
    """Return a dictionary mapping str(type(obj)) to a JSON loader for all vipy objects.

    This function is useful for JSON loading of vipy objects to map to the correct deserialization method.
    """

    import vipy.video
    import vipy.image
    import vipy.dataset

    registry = {"<class 'vipy.video.Scene'>":vipy.video.Scene.from_json,
                "<class 'vipy.video.Video'>":vipy.video.Video.from_json,
                "<class 'vipy.video.VideoCategory'>":vipy.video.VideoCategory.from_json,
                "<class 'vipy.image.Image'>":vipy.image.Image.from_json,
                "<class 'vipy.image.ImageCategory'>":vipy.image.ImageCategory.from_json,
                "<class 'vipy.image.ImageDetection'>":vipy.image.ImageDetection.from_json,            
                "<class 'vipy.image.Scene'>":vipy.image.Scene.from_json,
                "<class 'vipy.geometry.BoundingBox'>":vipy.geometry.BoundingBox.from_json,
                "<class 'vipy.object.Track'>":vipy.object.Track.from_json,
                "<class 'vipy.object.Detection'>":vipy.object.Detection.from_json,
                "<class 'vipy.activity.Activity'>":vipy.activity.Activity.from_json,
                "<class 'vipy.dataset.Dataset'>":vipy.dataset.Dataset.from_json}
    try:
        import pycollector.video
        registry.update( {"<class 'pycollector.video.Video'>":pycollector.video.Video.from_json} )
    except:
        registry.update( {"<class 'pycollector.video.Video'>":lambda x: exec("raise ValueError(\"<class 'pycollector.video.Video'> not found - Run 'pip install pycollector' \")")})        
    try:
        import pycollector.admin.video
        registry.update( {"<class 'pycollector.admin.video.Video'>":pycollector.admin.video.Video.from_json} )
    except:
        registry.update( {"<class 'pycollector.admin.video.Video'>":lambda x: exec("raise ValueError(\"<class 'pycollector.admin.video.Video'> not found - This is for admin use only \")")})        

    registry.update( {None: cPickle.loads} )  # fallback on generic pickel dumps
    return registry
def clockstamp()

Datetime stamp in local timezone with second resolution with format Year-Month-Day Hour:Minute:Second

Expand source code Browse git
def clockstamp():
    """Datetime stamp in local timezone with second resolution with format Year-Month-Day Hour:Minute:Second"""    
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")    
def count_images_in_subdirectories(indir)

Count the total number of images in indir/subdir1, indir/subdir2, go down only one level and no further…

Expand source code Browse git
def count_images_in_subdirectories(indir):
    """Count the total number of images in indir/subdir1, indir/subdir2, go down only one level and no further..."""
    num_files = 0
    for d in dirlist(outdir):
        num_files += len(imlist(d))
    return num_files
def countby(inset, keyfunc=<function <lambda>>)

Return dictionary of keys and group sizes for a grouping of the input list by keyfunc lambda function, sorted by increasing count

Expand source code Browse git
def countby(inset, keyfunc=lambda x: x):
    """Return dictionary of keys and group sizes for a grouping of the input list by keyfunc lambda function, sorted by increasing count""" 
    return {k:v for (k,v) in sorted({k:len(v) for (k,v) in groupbyasdict(inset, keyfunc).items()}.items(), key=lambda x: x[1])}
def countbyasdict(inset, keyfunc)

Alias for countby()

Expand source code Browse git
def countbyasdict(inset, keyfunc):
    """Alias for `vipy.util.countby`"""
    return countby(inset, keyfunc)
def csvlist(imdir)

Return a list of absolute paths of *.csv files in current directory

Expand source code Browse git
def csvlist(imdir):
    """Return a list of absolute paths of *.csv files in current directory"""
    return [os.path.join(imdir, item) for item in os.listdir(imdir)
            if iscsv(item)]
def datestamp()

Return date and time string in form DDMMMYY

Expand source code Browse git
def datestamp():
    """Return date and time string in form DDMMMYY"""
    return str.upper(strftime("%d%b%y", localtime()))
def dedupe(inlist, f)

Deduplicate the list using the provided lambda function which transforms an element to a dedupe key, such that all elements with the same key are duplicates

Expand source code Browse git
def dedupe(inlist, f):
    """Deduplicate the list using the provided lambda function which transforms an element to a dedupe key, such that all elements with the same key are duplicates"""
    assert callable(f)
    assert isinstance(inlist, list)
    return list({f(x):x for x in inlist}.values())
def delpath(indir, filename)

Return c/d.ext for filename /a/b/c/d.ext and indir /a/b

Expand source code Browse git
def delpath(indir, filename):
    """Return c/d.ext for filename /a/b/c/d.ext and indir /a/b"""
    assert indir in filename, 'Path "%s" not found in filename "%s"' % (indir, filename)
    indir = os.path.join(indir, '')  # /a/b -> /a/b/
    return filename.split(indir)[1]
def dirlist(indir)

return list of absolute paths to subdirectories in a directory

Expand source code Browse git
def dirlist(indir):
    """return list of absolute paths to subdirectories in a directory"""
    return [os.path.abspath(os.path.join(indir, item))
            for item in os.listdir(indir)
            if (os.path.isdir(os.path.join(indir, item)) and
                not is_hiddenfile(item))]
def dirlist_sorted_bycreation(indir)

Sort the directory list from newest first to oldest last by creation date

Expand source code Browse git
def dirlist_sorted_bycreation(indir):
    """Sort the directory list from newest first to oldest last by creation date"""
    return sorted(dirlist(indir), key=lambda d: os.stat(d).st_ctime, reverse=True)
def dirload(indir)

Load a directory by recursively searching for loadable archives and loading them into a flat list

Expand source code Browse git
def dirload(indir):
    """Load a directory by recursively searching for loadable archives and loading them into a flat list"""
    return [x for f in findloadable(indir) for x in load(f)]
def dividelist(inlist, fractions)

Divide inlist into a list of lists such that the size of each sublist is the requseted fraction of the original list.

This operation is deterministic and generates the same division in multiple calls.

Args

inlist
[list]
fractions
[tuple] such as (0.1, 0.7, 0.2) An iterable of fractions that must be non-negative and sum to one
Expand source code Browse git
def dividelist(inlist, fractions):
    """Divide inlist into a list of lists such that the size of each sublist is the requseted fraction of the original list. 

       This operation is deterministic and generates the same division in multiple calls.
       
    Args:
        inlist: [list]
        fractions: [tuple] such as (0.1, 0.7, 0.2)   An iterable of fractions that must be non-negative and sum to one
    """
    assert all([f >= 0 and f <=1 for f in fractions])
    assert np.sum(fractions) == 1
    assert len(inlist) >= len(fractions)
    N = np.int32(np.maximum(0, np.ceil(len(inlist)*np.array(fractions))))
    outlist = []
    for n in N:
        outlist.append(inlist[0:n])
        inlist = inlist[n:]
    return outlist
def extlist(indir, ext)

return list of files with absolute path in a directory that have the provided extension (with the prepended dot, ext='.mp4')

Expand source code Browse git
def extlist(indir, ext):
    """return list of files with absolute path in a directory that have
    the provided extension (with the prepended dot, ext='.mp4')"""
    return [os.path.abspath(os.path.join(indir, item))
            for item in os.listdir(indir)
            if fileext(item) is not None
            and (fileext(item).lower() == ext.lower())]
def filebase(filename)

Return c for filename /a/b/c.ext

Warning: Will return /a/b/c.d for multidot filenames wth more than two trailing dots like /a/b/c.d.e.f (e.g. /a/b/my.filename.tar.gz)

Expand source code Browse git
def filebase(filename):
    """Return c for filename /a/b/c.ext
    
    .. warning:: Will return /a/b/c.d for multidot filenames wth more than two trailing dots like /a/b/c.d.e.f (e.g. /a/b/my.filename.tar.gz)
    """
    (head, tail) = os.path.split(filename)
    (base, ext) = splitext(tail)
    return base
def fileext(filename, multidot=True, withdot=True)

Given filename /a/b/c.ext return '.ext', or /a/b/c.tar.gz return '.tar.gz'. If multidot=False, then return '.gz'. If withdot=False, return 'ext'. Multidot support at most two trailing dots

Expand source code Browse git
def fileext(filename, multidot=True, withdot=True):
    """Given filename /a/b/c.ext return '.ext', or /a/b/c.tar.gz return '.tar.gz'.   If multidot=False, then return '.gz'.  If withdot=False, return 'ext'.  Multidot support at most two trailing dots"""
    (head, tail) = os.path.split(filename)
    try:
        parts = str.rsplit(tail, '.', 2)
        if len(parts) == 3 and multidot:
            ext = '.%s.%s' % (parts[1], parts[2])  # .tar.gz
        elif len(parts) == 3 and not multidot:
            ext = '.%s' % (parts[2])  # .gz            
        else:
            ext = '.' + parts[1]  # .mp4

    except:
        base = tail
        ext = None
    return ext if withdot else ext[1:]
def filefull(f)

Return /a/b/c for filename /a/b/c.ext

Expand source code Browse git
def filefull(f):
    """Return /a/b/c for filename /a/b/c.ext"""
    ext = fileext(f, multidot=True, withdot=True)
    return f.replace(ext, '') if ext is not None else f
def filepath(filename, depth=0)

Return /a/b/c for filename /a/b/c/d.ext, /a/b for filename /a/b/c/d.ext if depth=1, etc

Expand source code Browse git
def filepath(filename, depth=0):
    """Return /a/b/c for filename /a/b/c/d.ext, /a/b for filename /a/b/c/d.ext if depth=1, etc"""
    (head, tail) = os.path.split(filename)
    for k in range(depth):
        (head, tail) = os.path.split(head)           
    return head
def filetail(filename)

Return c.ext for filename /a/b/c.ext

Expand source code Browse git
def filetail(filename):
    """Return c.ext for filename /a/b/c.ext"""
    (head, tail) = os.path.split(filename)
    return tail
def findimage(basedir)

Return a list of absolute paths to image files recursively discovered by walking the directory tree rooted at basedir

Expand source code Browse git
def findimage(basedir):
    """Return a list of absolute paths to image files recursively discovered by walking the directory tree rooted at basedir"""
    return [str(path.resolve()) for path in pathlib.Path(basedir).rglob('*') if isimage(str(path.resolve()))]
def findimages(basedir)

Alias for findimage()

Expand source code Browse git
def findimages(basedir):
    """Alias for `vipy.util.findimage`"""
    return findimage(basedir)
def findjson(basedir)

Return a list of absolute paths to json files recursively discovered by walking the directory tree rooted at basedir

Expand source code Browse git
def findjson(basedir):
    """Return a list of absolute paths to json files recursively discovered by walking the directory tree rooted at basedir"""
    return [str(path.resolve()) for path in pathlib.Path(basedir).rglob('*.json')]
def findloadable(basedir)

Return a list of absolute paths to any archive file loadable by vipy.load (.pkl, .json, *.pkl.bz2). Recursively search starting from basedir

Expand source code Browse git
def findloadable(basedir):
    """Return a list of absolute paths to any archive file loadable by `vipy.load` (*.pkl, *.json, *.pkl.bz2).  Recursively search starting from basedir"""
    return findpkl(basedir) + findjson(basedir) + findpklbz2(basedir)
def findpdf(basedir)

Return a list of absolute paths to pdf files recursively discovered by walking the directory tree rooted at basedir

Expand source code Browse git
def findpdf(basedir):
    """Return a list of absolute paths to pdf files recursively discovered by walking the directory tree rooted at basedir"""
    return [str(path.resolve()) for path in pathlib.Path(basedir).rglob('*.pdf')]
def findpkl(basedir)

Return a list of absolute paths to pkl files recursively discovered by walking the directory tree rooted at basedir

Expand source code Browse git
def findpkl(basedir):
    """Return a list of absolute paths to pkl files recursively discovered by walking the directory tree rooted at basedir"""
    return [str(path.resolve()) for path in pathlib.Path(basedir).rglob('*.pkl')]
def findpklbz2(basedir)

Return a list of absolute paths to .pkl.bz2 files recursively discovered by walking the directory tree rooted at basedir

Expand source code Browse git
def findpklbz2(basedir):
    """Return a list of absolute paths to .pkl.bz2 files recursively discovered by walking the directory tree rooted at basedir"""
    return [str(path.resolve()) for path in pathlib.Path(basedir).rglob('*.pkl.bz2')]
def findpng(basedir)

Return a list of absolute paths to png files recursively discovered by walking the directory tree rooted at basedir

Expand source code Browse git
def findpng(basedir):
    """Return a list of absolute paths to png files recursively discovered by walking the directory tree rooted at basedir"""
    return [str(path.resolve()) for path in pathlib.Path(basedir).rglob('*.png')]
def findtar(basedir)

Return a list of absolute paths to tar files recursively discovered by walking the directory tree rooted at basedir

Expand source code Browse git
def findtar(basedir):
    """Return a list of absolute paths to tar files recursively discovered by walking the directory tree rooted at basedir"""
    return [str(path.resolve()) for path in pathlib.Path(basedir).rglob('*.tar')]
def findvideo(basedir)

Return a list of absolute paths to video files recursively discovered by walking the directory tree rooted at basedir

Expand source code Browse git
def findvideo(basedir):
    """Return a list of absolute paths to video files recursively discovered by walking the directory tree rooted at basedir"""
    return [str(path.resolve()) for path in pathlib.Path(basedir).rglob('*') if isvideo(str(path.resolve()))]
def findvideos(basedir)

Alias for findvideo()

Expand source code Browse git
def findvideos(basedir):
    """Alias for `vipy.util.findvideo`"""
    return findvideo(basedir)
def findwebp(basedir)

Return a list of absolute paths to video files recursively discovered by walking the directory tree rooted at basedir

Expand source code Browse git
def findwebp(basedir):
    """Return a list of absolute paths to video files recursively discovered by walking the directory tree rooted at basedir"""
    return [str(path.resolve()) for path in pathlib.Path(basedir).rglob('*') if iswebp(str(path.resolve()))]
def findyaml(basedir)

Return a list of absolute paths to yaml files recursively discovered by walking the directory tree rooted at basedir

Expand source code Browse git
def findyaml(basedir):
    """Return a list of absolute paths to yaml files recursively discovered by walking the directory tree rooted at basedir"""
    return [str(path.resolve()) for path in pathlib.Path(basedir).rglob('*.yml')]
def flatlist(inlist)

Convert list of tuples into a list expanded by concatenating tuples. If the input is already flat, return it unchanged.

Expand source code Browse git
def flatlist(inlist):
    """Convert list of tuples into a list expanded by concatenating tuples.  If the input is already flat, return it unchanged."""
    return [x for r in inlist for x in (r if (isinstance(r, list) or isinstance(r, tuple)) else (r,))]
def gray2bgr(im_gray)

Wrapper for numpy float32 gray image to uint8 numpy BGR

Expand source code Browse git
def gray2bgr(im_gray):
    """Wrapper for numpy float32 gray image to uint8 numpy BGR"""
    return np.array(PIL.Image.fromarray(im_gray, mode='F').convert('RGB'))[:,:,::-1]  # Gray -> RGB -> BGR
def gray2hsv(im_gray)

Convert a numpy array in floating point single channel greyscale order to HSV

Expand source code Browse git
def gray2hsv(im_gray):
    """Convert a numpy array in floating point single channel greyscale order to HSV"""
    return np.array(PIL.Image.fromarray(gray2rgb(im_gray)).convert('HSV'))  # Gray -> RGB -> HSV
def gray2jet(img)

[0,1] grayscale to [0.255] RGB

Expand source code Browse git
def gray2jet(img):
    """[0,1] grayscale to [0.255] RGB"""
    import matplotlib.pyplot as plt
    jet = plt.get_cmap('jet')
    return np.uint8(255.0 * jet(img)[:, :, 0:3])
def gray2rgb(im_gray)
Expand source code Browse git
def gray2rgb(im_gray):
    return bgr2rgb(gray2bgr(im_gray))
def groupby(inset, keyfunc)

groupby on unsorted inset

Expand source code Browse git
def groupby(inset, keyfunc):
    """groupby on unsorted inset"""
    return itertools_groupby(sorted(inset, key=keyfunc), keyfunc)
def groupbyasdict(togroup, keyfunc)

Return dictionary of keys and lists from groupby on unsorted inset, where keyfunc is a lambda function on elements in inset

Args

togroup
a list of elements to group
keyfunc
a lambda function to operate on elemenets of togroup such that the value returned from the lambda is the equality key for grouping

Returns

A dictionary with unique keys returned from keyfunc, and values are lists of elements in togroup with the same key

Expand source code Browse git
def groupbyasdict(togroup, keyfunc):
    """Return dictionary of keys and lists from groupby on unsorted inset, where keyfunc is a lambda function on elements in inset
    
    Args:
        togroup: a list of elements to group
        keyfunc:  a lambda function to operate on elemenets of togroup such that the value returned from the lambda is the equality key for grouping

    Returns:
        A dictionary with unique keys returned from keyfunc, and values are lists of elements in togroup with the same key

    """
    return {k: list(v) for (k, v) in groupby(togroup, keyfunc)}
def hascache()

Is the VIPY_CACHE environment variable set?

Expand source code Browse git
def hascache():
    """Is the VIPY_CACHE environment variable set?"""
    return 'VIPY_CACHE' in os.environ
def hasextension(filename)

Does the provided filename have a file extension (e.g. /path/to/file.ext) or not (e.g. /path/to/file)

Expand source code Browse git
def hasextension(filename):
    """Does the provided filename have a file extension (e.g. /path/to/file.ext) or not (e.g. /path/to/file)"""
    return fileext(filename) is not None
def imcrop(img, bbox)

Crop a 2D or 3D numpy image given a vipy.geometry.BoundingBox

Expand source code Browse git
def imcrop(img, bbox):
    """Crop a 2D or 3D numpy image given a vipy.geometry.BoundingBox"""
    return img[bbox.xmin():bbox.xmax(), bbox.ymin():bbox.ymax()]
def imlist(imdir)

return list of images with absolute path in a directory

Expand source code Browse git
def imlist(imdir):
    """return list of images with absolute path in a directory"""
    return [os.path.abspath(os.path.join(imdir, item))
            for item in os.listdir(imdir)
            if (isimg(item) and not is_hiddenfile(item))]
def imlistidx(filelist, idx_in_filename)

Return index in list of filename containing index number

Expand source code Browse git
def imlistidx(filelist, idx_in_filename):
    """Return index in list of filename containing index number"""
    return [i for (i, item) in enumerate(filelist)
            if (item.find('%d' % idx_in_filename) > 0)]
def imread(imfile)

Wrapper for opencv imread. Note that color images are imported as BGR!

Expand source code Browse git
def imread(imfile):
    """Wrapper for opencv imread. Note that color images are imported as
    BGR!"""
    return np.array(PIL.Image.open(imfile))[:,:,::-1]
def imrescale(im, scale)
Expand source code Browse git
def imrescale(im, scale):
    (height, width) = (im.shape[0], im.shape[1])
    return np.array(PIL.Image.fromarray(im).resize((int(np.round(scale * width)), int(np.round(scale * height))), PIL.Image.BILINEAR))
def imresize(im, rows, cols)
Expand source code Browse git
def imresize(im, rows, cols):
    return np.array(PIL.Image.fromarray(im).resize((rows, cols), PIL.Image.BILINEAR))
def imsavelist(imdir, outfile)

Write out all images in a directory to a provided file with each line containing absolute path to image

Expand source code Browse git
def imsavelist(imdir, outfile):
    """Write out all images in a directory to a provided file with each
    line containing absolute path to image"""
    return writelist(imlist(imdir), outfile)
def imwrite(img, imfile=None, writeas=None)

Write a floating point 2D numpy image as jet or gray, 3D numpy as rgb or bgr

Expand source code Browse git
def imwrite(img, imfile=None, writeas=None):
    """Write a floating point 2D numpy image as jet or gray, 3D numpy as
    rgb or bgr"""
    if imfile is None:
        imfile = temppng()
    if not isnumpy(img):
        raise ValueError('image must by numpy object')
    if writeas is None:
        if img.ndim == 2:
            writeas = 'gray'
        else:
            writeas = 'bgr'

    imfile = os.path.expanduser(imfile)
    if writeas in ['jet']:
        imwritejet(img, imfile)
    elif writeas in ['gray']:
        imwritegray(img, imfile)
    elif writeas in ['rgb']:
        if img.ndim != 3:
            raise ValueError('numpy array must be 3D')
        if img.dtype == np.dtype('uint8'):
            PIL.Image.fromarray(rgb2bgr(img)).save(imfile)  # convert to BGR
        elif img.dtype == np.dtype('float32'):
            # convert to uint8 then BGR
            PIL.Image.fromarray(rgb2bgr(np.uint8(255.0 * img))).save(imfile)
    elif writeas in ['bgr']:
        if img.ndim != 3:
            raise ValueError('numpy array must be 3D')
        if img.dtype == np.dtype('uint8'):
            PIL.Image.fromarray(img).save(imfile)  # convert to BGR
        elif img.dtype == np.dtype('float32'):
            # convert to uint8 then BGR
            PIL.Image.fromarray(np.uint8(255.0 * img)).save(imfile)
    else:
        raise ValueError('unsupported writeas')

    return imfile
def imwritegray(img, imfile=None)

Write a floating point grayscale numpy image in [0,1] as [0,255] grayscale

Expand source code Browse git
def imwritegray(img, imfile=None):
    """Write a floating point grayscale numpy image in [0,1] as [0,255] grayscale"""
    if imfile is None:
        imfile = temppng()
    if isnumpy(img):
        if img.dtype == np.dtype('uint8'):
            # Assume that uint8 is in the range [0,255]
            PIL.Image.fromarray(img).save(os.path.expanduser(imfile))
        elif img.dtype == np.dtype('float32'):
            # Convert [0, 1.0] to uint8 [0,255]
            PIL.Image.fromarray(np.uint8(img * 255.0)).save(os.path.expanduser(imfile))
        else:
            raise ValueError('Unsupported datatype - '
                             'Numpy array must be uint8 or float32')
    else:
        raise ValueError('Input must be numpy array')
    return imfile
def imwritejet(img, imfile=None)

Write a grayscale numpy image as a jet colormapped image to the given file

Expand source code Browse git
def imwritejet(img, imfile=None):
    """Write a grayscale numpy image as a jet colormapped image to the
    given file"""
    if imfile is None:
        imfile = temppng()

    if isnumpy(img):
        if img.ndim == 2:
            cm = plt.get_cmap('gist_rainbow')
            PIL.Image.fromarray(np.uint8(255 * cm(img)[:,:,:3])).save(os.path.expanduser(imfile))
        else:
            raise ValueError('Input must be a 2D numpy array')
    else:
        raise ValueError('Input must be numpy array')
    return imfile
def isRTMPurl(path)
Expand source code Browse git
def isRTMPurl(path):
    return isurl(path) and (path.startswith('rtmp://') or path.startswith('rtmps://'))
def isRTSPurl(path)
Expand source code Browse git
def isRTSPurl(path):
    return isurl(path) and path.startswith('rtsp://')
def isS3url(path)

Is a path a URL for an S3 object?

Expand source code Browse git
def isS3url(path):
    """Is a path a URL for an S3 object?"""
    return isurl(path) and urlparse(path).scheme == 's3'
def is_email_address(email)

Is the provided string an email address?

Expand source code Browse git
def is_email_address(email):
    """Is the provided string an email address?"""
    regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
    return re.fullmatch(regex, email) is not None
def is_hiddenfile(filename)

Does the filename start with a period?

Expand source code Browse git
def is_hiddenfile(filename):
    """Does the filename start with a period?"""
    return filename[0] == '.'
def isarchive(filename)

Is filename a zip or gzip compressed tar archive?

Expand source code Browse git
def isarchive(filename):
    """Is filename a zip or gzip compressed tar archive?"""
    (filebase, ext) = splitext(filename)
    if (ext is not None) and (len(ext) > 0) and (ext.lower() in [
            '.egg', '.jar', '.tar', '.tar.bz2', '.tar.gz',
            '.tgz', '.tz2', '.zip', '.gz']):
        return True
    else:
        (filebase, ext) = splitext(ext[1:])
        if (ext is not None) and (len(ext) > 0) and (ext.lower() in ['.bz2']):
            return True
        else:
            return False
def iscsv(path)

Is a file a CSV file extension?

Expand source code Browse git
def iscsv(path):
    """Is a file a CSV file extension?"""

    (filename, ext) = (os.path.splitext(path) if path is not None else ('',''))
    if ext.lower() in ['.csv', '.CSV']:
        return True
    else:
        return False
def isexe(filename)

Is the file an executable binary?

Expand source code Browse git
def isexe(filename):
    """Is the file an executable binary?"""
    return os.path.isfile(filename) and os.access(filename, os.X_OK)
def isextension(filename, ext)

Does the filename end with the extension ext?

isextension('/path/to/myfile.json', 'json') == True
isextension('/path/to/myfile.json', '.json') == True
isextension('/path/to/myfile.json', '.pkl') == False
Expand source code Browse git
def isextension(filename, ext):
    """Does the filename end with the extension ext? 
    
    ```python
    isextension('/path/to/myfile.json', 'json') == True
    isextension('/path/to/myfile.json', '.json') == True
    isextension('/path/to/myfile.json', '.pkl') == False
    ```

    """
    return filename is not None and filename.endswith(ext)
def isfile(path)

Wrapper for os.path.isfile

Expand source code Browse git
def isfile(path):
    """Wrapper for os.path.isfile"""
    return os.path.isfile(str(path))
def isfloat(x)

Is the input a float or a string that can be converted to float?

Expand source code Browse git
def isfloat(x):
    """Is the input a float or a string that can be converted to float?"""
    try:
        float(x)
        return True
    except ValueError:
        return False
def isgif(path)

is the file a .gif extension?

Expand source code Browse git
def isgif(path):
    """is the file a .gif extension?"""
    return hasextension(path) and fileext(path).lower() == '.gif'
def ishdf5(path)

Is the file an HDF5 file?

Expand source code Browse git
def ishdf5(path):
    """Is the file an HDF5 file?"""
    # tables.is_hdf5_file(path)
    # tables.is_pytables_file(path)
    (filename, ext) = os.path.splitext(path)
    if (ext is not None) and (len(ext) > 0) and (ext.lower() in ['.h5']):
        return True
    else:
        return False
def ishtml(filename)

Is the file an HTMLfile

Expand source code Browse git
def ishtml(filename):
    """Is the file an HTMLfile"""
    return filename.lower()[-5:] == '.html'
def isimage(path)

Alias for isimg()

Expand source code Browse git
def isimage(path):
    """Alias for `vipy.util.isimg`"""
    return isimg(path)
def isimagefile(path)

Alias for isimg()

Expand source code Browse git
def isimagefile(path):
    """Alias for `vipy.util.isimg`"""
    return isimg(path)
def isimageobject(x)

Is an object a vipy.image class Image, ImageCategory, ImageDetection?

Expand source code Browse git
def isimageobject(x):
    """Is an object a vipy.image class Image, ImageCategory, ImageDetection?"""
    return (str(type(x)) in ["<class 'vipy.image.Image'>",
                             "<class 'vipy.image.ImageCategory'>",
                             "<class 'vipy.image.ImageDetection'>"])
def isimageurl(path)

Is a path a URL with image extension?

Expand source code Browse git
def isimageurl(path):
    """Is a path a URL with image extension?"""
    return path is not None and isurl(path) and isimg(path)
def isimg(path)

Is an object an image with a supported image extension ['.jpg','.jpeg','.png','.tif','.tiff','.pgm','.ppm','.gif','.bmp']?

Expand source code Browse git
def isimg(path):
    """Is an object an image with a supported image extension ['.jpg','.jpeg','.png','.tif','.tiff','.pgm','.ppm','.gif','.bmp']?"""    
    if path is not None and os.path.splitext(path)[1].lower() in ['.jpg', '.jpeg', '.png', '.tif', '.tiff', '.pgm', '.ppm', '.gif', '.bmp']:
        return True
    else:
        return False
def isimgfile(path)

Alias for isimg()

Expand source code Browse git
def isimgfile(path):
    """Alias for `vipy.util.isimg`"""
    return isimg(path)
def isinstalled(cmd)

Is the command is available on the path

Expand source code Browse git
def isinstalled(cmd):
    """Is the command is available on the path"""
    return shutil.which(cmd) is not None
def isjpeg(path)

is the file a .jpg or .jpeg extension?

Expand source code Browse git
def isjpeg(path):
    """is the file a .jpg or .jpeg extension?"""
    return hasextension(path) and fileext(path).lower() == '.jpg' or fileext(path).lower() == '.jpeg'
def isjpg(path)

Alias for isjpeg()

Expand source code Browse git
def isjpg(path):
    """Alias for `vipy.util.isjpeg`"""
    return isjpeg(path)
def isjsonfile(filename)
Expand source code Browse git
def isjsonfile(filename):
    return isinstance(filename, str) and len(filename) > 5 and filename[-5:] == '.json'
def islinux()

is the current platform Linux?

Expand source code Browse git
def islinux():
    """is the current platform Linux?"""
    (sysname, nodename, release, version, machine) = os.uname()
    return sysname == 'Linux'
def islist(x)

Is an object a python list

Expand source code Browse git
def islist(x):
    """Is an object a python list"""
    return type(x) is list
def islistoflists(x)

Is an object a python list of lists x=[[1,2], [3,4]]

Expand source code Browse git
def islistoflists(x):
    """Is an object a python list of lists x=[[1,2], [3,4]]"""
    return type(x) is list and type(x[0]) is list
def ismacosx()

Is the current platform MacOSX?

Expand source code Browse git
def ismacosx():
    """Is the current platform MacOSX?"""
    (sysname, nodename, release, version, machine) = os.uname()
    return sysname == 'Darwin'
def isnumber(x)

Is the input a python type of a number or a string containing a number?

Expand source code Browse git
def isnumber(x):
    """Is the input a python type of a number or a string containing a number?"""
    return isinstance(x, (int, float)) or (isnumpy(x) and np.isscalar(x)) or (isstring(x) and isfloat(x))
def isnumpy(obj)

Is a python object a numpy object?

Expand source code Browse git
def isnumpy(obj):
    """Is a python object a numpy object?"""
    return ('numpy' in str(type(obj)))
def isnumpyarray(obj)

Is a python object a numpy array?

Expand source code Browse git
def isnumpyarray(obj):
    """Is a python object a numpy array?"""
    return isnumpy(obj) and 'numpy.ndarray' in str(type(obj))
def ispickle(filename)

Is the file a pickle archive file

Expand source code Browse git
def ispickle(filename):
    """Is the file a pickle archive file"""
    return isfile(filename) and os.path.exists(filename) and (((fileext(filename) is not None) and fileext(filename).lower() in ['.pk', '.pkl']) or (filename[-4:] == '.pkl'))
def ispkl(filename)

Is the file a pickle archive file

Expand source code Browse git
def ispkl(filename):
    """Is the file a pickle archive file"""
    return filename[-4:] == '.pkl' if isstring(filename) and len(filename) >= 4 else False
def ispklbz2(filename)

Is the file a pickle archive file

Expand source code Browse git
def ispklbz2(filename):
    """Is the file a pickle archive file"""
    return filename[-8:] == '.pkl.bz2' if isstring(filename) and len(filename) >= 8 else False
def ispklfile(filename)

Is the file a pickle archive file

Expand source code Browse git
def ispklfile(filename):
    """Is the file a pickle archive file"""
    return ispkl(filename)
def ispng(path)

is the file a .png or .apng extension?

Expand source code Browse git
def ispng(path):
    """is the file a .png or .apng extension?"""
    return hasextension(path) and (fileext(path).lower() == '.png' or fileext(path).lower() == '.apng')
def isstring(s)

Is an object a python string or unicode string?

Expand source code Browse git
def isstring(s):
    """Is an object a python string or unicode string?"""
    return isinstance(s, str)  # python3
def istar(filename)

Is the filename a .tar extension?

Expand source code Browse git
def istar(filename):
    """Is the filename a .tar extension?"""
    return filename[-4:] == '.tar'
def istarbz2(filename)

Is the filename a .bz2 or .tar.bz2 extension?

Expand source code Browse git
def istarbz2(filename):
    """Is the filename a .bz2 or .tar.bz2 extension?"""
    return filename[-8:] == '.tar.bz2'
def istextfile(path)

Is the given file a text file?

Expand source code Browse git
def istextfile(path):
    """Is the given file a text file?"""
    (filename, ext) = (os.path.splitext(path) if path is not None else ('',''))
    if ext.lower() in ['.txt'] and (filename[0] != '.'):
        return True
    else:
        return False
def istgz(filename)

Is the filename a .tgz or .tar.gz extension?

Expand source code Browse git
def istgz(filename):
    """Is the filename a .tgz or .tar.gz extension?"""
    return filename[-4:] == '.tgz' or filename[-7:] == '.tar.gz'
def istuple(x)

Is an object a python tuple?

Expand source code Browse git
def istuple(x):
    """Is an object a python tuple?"""
    return isinstance(x, tuple)
def istupleoftuples(x)

Is an object a python list of lists x=[[1,2], [3,4]]

Expand source code Browse git
def istupleoftuples(x):
    """Is an object a python list of lists x=[[1,2], [3,4]]"""
    return type(x) is tuple and type(x[0]) is tuple
def isuint8(img)
Expand source code Browse git
def isuint8(img):
    return isnumpy(img) and img.dtype == np.dtype('uint8')
def isurl(path)

Is a path a URL? It requires a url scheme and url netloc without any common unallowed characters

Expand source code Browse git
def isurl(path):
    """Is a path a URL?  It requires a url scheme and url netloc without any common unallowed characters"""
    try:
        url = urlparse(path)
        return not any([c in path for c in ('>','<','"')]) and bool(url.scheme) and bool(url.netloc)
    except:
        return False
def isvideo(path)

Is a filename in path a video with a known video extension ['.avi','.mp4','.mov','.wmv','.mpg', 'mkv', 'webm', '3gp']?

Expand source code Browse git
def isvideo(path):
    """Is a filename in path a video with a known video extension ['.avi','.mp4','.mov','.wmv','.mpg', 'mkv', 'webm', '3gp']?"""
    if path is not None and os.path.splitext(path)[1].lower() in ['.avi','.mp4','.mov','.wmv','.mpg', '.mkv', '.webm', '.3gp']:
        return True
    else:
        return False
def isvideofile(path)

Alias for isvideo()

Expand source code Browse git
def isvideofile(path):
    """Alias for `vipy.util.isvideo`"""
    return isvideo(path)
def isvideoobject(x)
Expand source code Browse git
def isvideoobject(x):
    return isvideotype(x)
def isvideotype(x)

Is an object a vipy.video class Video, VideoCategory, Scene?

Expand source code Browse git
def isvideotype(x):
    """Is an object a vipy.video class Video, VideoCategory, Scene?"""
    return (str(type(x)) in ["<class 'vipy.video.Video'>",
                             "<class 'vipy.video.VideoCategory'>",
                             "<class 'vipy.video.Scene'>"])
def isvideourl(path)

Is a path a URL with video extension?

Expand source code Browse git
def isvideourl(path):
    """Is a path a URL with video extension?"""
    return isurl(path) and isvideo(path)
def isvipyobject(x)
Expand source code Browse git
def isvipyobject(x):
    import vipy.image
    import vipy.video
    return ((isinstance(x, vipy.image.Image) or isinstance(x, vipy.video.Video)) 
            or (islist(x) or istuple(x) and all([isinstance(v, vipy.image.Image) or isinstance(v, vipy.video.Video) for v in x]))
            or (isinstance(x, dict) and all([isinstance(v, vipy.image.Image) or isinstance(v, vipy.video.Video) for (k,v) in x.items()])))
def iswebp(path)

is the file a .webp extension?

Expand source code Browse git
def iswebp(path):
    """is the file a .webp extension?"""
    return hasextension(path) and fileext(path).lower() == '.webp'
def isxml(path)

Is the given file an xml file?

Expand source code Browse git
def isxml(path):
    """Is the given file an xml file?"""
    (filename, ext) = os.path.splitext(path)
    if ext.lower() in ['.xml']:
        return True
    else:
        return False
def isyoutubeurl(path)

Is a path a youtube URL?

Expand source code Browse git
def isyoutubeurl(path):
    """Is a path a youtube URL?"""
    return isurl(path) and ('youtube.com' in path or 'youtu.be' in path)
def jet(n, bgr=False)

jet colormap

Expand source code Browse git
def jet(n, bgr=False):
    """jet colormap"""
    from matplotlib import cm
    cmap = cm.get_cmap('jet', n)
    rgb = np.uint8(255 * cmap(np.arange(n)))
    return rgb if bgr is False else np.fliplr(rgb)
def jsonlist(indir)

return list of fJSON iles with absolute path in a directory

Expand source code Browse git
def jsonlist(indir):
    """return list of fJSON iles with absolute path in a directory"""
    #return extlist(indir, ext='.json')  # FIXME: broken.for.wonky.filenames.with.dots.json
    return [os.path.abspath(os.path.join(indir, item))
            for item in os.listdir(indir)
            if len(item) > 5 and item[-5:] == '.json']
def keymax(d)

Return key in dictionary containing maximum value

Expand source code Browse git
def keymax(d):
    """Return key in dictionary containing maximum value"""
    vmax = max(d.values())
    for (k, v) in d.items():
        if v == vmax:
            return k
def keymin(d)

Return key in dictionary containing minimum value

Expand source code Browse git
def keymin(d):
    """Return key in dictionary containing minimum value"""
    vmin = min(d.values())
    for (k, v) in d.items():
        if v == vmin:
            return k
def linuxversion()

Return linux version

Expand source code Browse git
def linuxversion():
    """Return linux version"""
    if islinux():
        with open('/etc/redhat-release') as f:
            v = f.readlines()
            m = re.match('[a-zA-Z ]+([0-9]+\.[0-9]+)', v[0])
            return m.groups(1)[0]
    return None
def listext(indir, ext)

Alias for extlist

Expand source code Browse git
def listext(indir, ext):
    """Alias for extlist"""
    return extlist(indir, ext)
def listjson(indir)

Alias for jsonlist

Expand source code Browse git
def listjson(indir):
    """Alias for jsonlist"""
    return jsonlist(indir)
def listpkl(indir)

Return a list of absolute paths of *.pk files in current directory

Expand source code Browse git
def listpkl(indir):
    """Return a list of absolute paths of *.pk files in current directory"""
    return [os.path.join(indir, item) for item in os.listdir(indir)
            if ispickle(os.path.join(indir, item))]
def load(infile, abspath=True, refcycle=True)

Load variables from a relocatable archive file format, either dill pickle, JSON format or JSON directory format.

Loading is performed by attemping the following:

  1. If the input file is a directory, return a Dataset with lazy loading of all pkl or json files recursively discovered in this directory.
  2. If the input file is a pickle or json file, load it
  3. if abspath=true, then convert relative paths to absolute paths for object when loaded
  4. If refcycle=False, then disable the python reference cycle garbage collector for large archive files
im = vipy.image.owl()
f = vipy.util.save(im)
im = vipy.util.load(im)

Args: infile: [str] file saved using save() with extension [.pkl, .json]. This may also be a directory tree containing json or pkl files abspath: [bool] If true, then convert all vipy objects with relative paths to absolute paths. If False, then preserve relative paths and warn user. refcycle: [bool] If False, then disable python reference cycle garbage collector. This is useful for large python objects.

Returns: The object in the archive file

Expand source code Browse git
def load(infile, abspath=True, refcycle=True):
    """Load variables from a relocatable archive file format, either dill pickle, JSON format or JSON directory format.
       
       Loading is performed by attemping the following:

       1. If the input file is a directory, return a `vipy.dataset.Dataset` with lazy loading of all pkl or json files recursively discovered in this directory.
       2. If the input file is a pickle or json file, load it
       3. if abspath=true, then convert relative paths to absolute paths for object when loaded
       4. If refcycle=False, then disable the python reference cycle garbage collector for large archive files
    
    ```python
    im = vipy.image.owl()
    f = vipy.util.save(im)
    im = vipy.util.load(im)
    ```

       Args:
           infile: [str] file saved using `vipy.util.save` with extension [.pkl, .json].  This may also be a directory tree containing json or pkl files 
           abspath: [bool] If true, then convert all vipy objects with relative paths to absolute paths. If False, then preserve relative paths and warn user.
           refcycle: [bool] If False, then disable python reference cycle garbage collector.  This is useful for large python objects.
       
       Returns:
           The object in the archive file
    """
    infile = os.path.abspath(os.path.expanduser(infile))

    if ispkl(infile):
        obj = dill.load(open(infile, 'rb'))
    elif isjsonfile(infile):
        with open(infile, 'r') as f:
            loadobj = json.load(f)
        registry = class_registry()
        assert isinstance(loadobj, list) or isinstance(loadobj, dict), "invalid vipy JSON serialization format"
        if isinstance(loadobj, list) and all([isinstance(d, dict) for d in loadobj]) and all([c in registry for d in loadobj for (c,v) in d.items()]):
            obj = [registry[c](v) for d in loadobj for (c,v) in d.items()]
        elif isinstance(loadobj, dict) and all([c in registry for (c,d) in loadobj.items()]):
            obj = [registry[c](v) for (c,v) in loadobj.items()]
            obj = obj[0] if len(obj) == 1 else obj
        else:
            obj = loadobj
    elif isbz2(infile):
        return bz2pkl(infile)
    elif os.path.isdir(infile):        
        import vipy.dataset
        return vipy.dataset.Dataset(infile)
    else:
        raise ValueError('unknown file type')
    
    if len(tolist(obj)) == 0:
        return obj  
    testobj = tolist(obj)[0]  

    # Relocatable object?
    if hasattr(testobj, 'filename') and testobj.filename() is not None:
        if not os.path.isabs(testobj.filename()):
            if not abspath:
                warnings.warn('Loading archive "%s" with relative paths.  Changing directory to "%s".  Disable this warning with vipy.util.load(..., abspath=True).' % (infile, filepath(infile)))
                os.chdir(filepath(infile))
            else:
                # Absolute path?  The loaded archive will no longer be relocatable if you save this to a new archive, and the videos directory cannot be moved
                pwd = os.getcwd()  # save current directory
                os.chdir(filepath(infile))  # change to archive directory
                objout = [o.abspath() if o.filename() is not None else o for o in tolist(obj)]  # set absolute paths relative to archive directory
                obj = objout if isinstance(obj, list) else objout[0]
                os.chdir(pwd)  # restore current directory
        elif not testobj.hasfilename():
            warnings.warn('Loading "%s" that contains path (e.g. "%s") which does not exist' % (infile, testobj.filename()))

    # Large vipy object?  Disable garbage collection.
    #   - Python uses reference counting for the primary garbage collection mechanism, but also uses reference cycle checks to search for dependencies between objects.
    #   - All vipy objects are self contained, and do not have reference cycles.  However, there is no way to mark an individual object which does not participate in reference cycle counting.
    #   - This means that a large number of vipy objects, garbage collection can take minutes searching for cycles which are never there.  To fix this, globally disable the garbage collector.
    #   - Note that refernece counting is still performed, we are just disabling reference *cycle* counting using the generational garbage collector.
    #   - This can be re-enabled at any time by "import gc; gc.enable()"
    #   - If you use %autoreload iPython magic command, note that this will be very slow.  You should set %sutoreload 0
    #   - Alternatively, load as JSON and all attributes will be unpacked on demand and stored in a packed format that is not tracked (e.g. tuple of strings) by the reference cycle counter
    if not refcycle:
        warnings.warn('Disabling python reference cycle garbage collection.  Re-enable at any time using "import gc; gc.enable()"')
        import gc; gc.disable()
    return obj
def load_opencv_yaml(yamlfile)

Load a numpy array from YAML file exported from OpenCV

Expand source code Browse git
def load_opencv_yaml(yamlfile):
    """Load a numpy array from YAML file exported from OpenCV"""
    return np.squeeze(np.array(cv.Load(yamlfile)))
def loadh5(filename)

Load an HDF5 file

Expand source code Browse git
def loadh5(filename):
    """Load an HDF5 file"""
    if ishdf5(filename):
        try_import('h5py'); import h5py
        f = h5py.File(filename, 'r')
        obj = f[filebase(filename)].value  # FIXME: lazy evaluation?
        return obj
    else:
        raise ValueError('Invalid HDF5 file "%s" ' % filename)
def loadmat73(matfile, keys=None)

Matlab 7.3 format, keys should be a list of keys to access HDF5 file as f[key1][key2]… Returned as numpy array

Expand source code Browse git
def loadmat73(matfile, keys=None):
    """Matlab 7.3 format, keys should be a list of keys to access HDF5
    file as f[key1][key2]...  Returned as numpy array"""
    try_import('h5py'); import h5py
    f = h5py.File(matfile, 'r')
    if keys is None:
        return f
    else:
        for k in keys:
            f = f[k]
        return np.array(f)
def loudcatcher(f, prepend, *args, **kwargs)

Call the function f with the provided arguments, and return (True, result) on success and (False, exception) if there is any thrown exception. Print the exception immediately. Useful for parallel processing

Expand source code Browse git
def loudcatcher(f, prepend, *args, **kwargs):
    """Call the function f with the provided arguments, and return (True, result) on success and (False, exception) if there is any thrown exception.  Print the exception immediately.  Useful for parallel processing"""
    assert callable(f)
    try:
        return (True, f(*args, **kwargs))
    except Exception as e:
        print('%s%s' % (prepend, str(e)))
        return (False, str(e))
def mat2gray(img, min=None, max=None)

Convert numpy array to float32 with 1.0=max and 0=min

Expand source code Browse git
def mat2gray(img, min=None, max=None):
    """Convert numpy array to float32 with 1.0=max and 0=min"""
    immin = np.min(img) if min is None else min
    immax = np.max(img) if max is None else max
    if (immax - immin) > 0:
        return (np.float32(img) - immin) / (immax - immin)
    else:
        return img
def matread(txtfile, delimiter=' ')

Whitespace separated values defining columns, lines define rows. Return numpy array

Expand source code Browse git
def matread(txtfile, delimiter=' '):
    """Whitespace separated values defining columns, lines define rows.
    Return numpy array"""
    with open(txtfile, 'rb') as csvfile:
        M = [np.float32(row.split(delimiter)) for row in csvfile]
    return np.array(M)
def matrix_to_opencv_yaml(yamlfile, mtxlist, mtxname=None)

Write list of matrices to OpenCV yaml file format with given variable names

Expand source code Browse git
def matrix_to_opencv_yaml(yamlfile, mtxlist, mtxname=None):
    """Write list of matrices to OpenCV yaml file format with given
    variable names"""
    def _write_matrix(f, M, mtxname):
        f.write('    %s: !!opencv-matrix\n' % mtxname)
        f.write('       rows: %d\n' % M.shape[0])
        f.write('       cols: %d\n' % (M.shape[1] if M.ndim == 2 else 1))
        f.write('       dt: f\n')
        f.write('       data: [ ')
        datastr = ''
        for (k, x) in enumerate(M.flatten()):
            datastr += '%.6e' % x
            if (k + 1 == M.size):
                f.write(datastr)
                break
            datastr += ', '
            if ((k + 1) % 4) == 0:
                f.write(datastr + '\n           ')
                datastr = ''
        f.write(']\n')

    # Write me!
    mtxlist = tolist(mtxlist)
    if mtxname is None:
        mtxname = ['mtx_%02d' % k for k in range(0, len(mtxlist))]
    with open(yamlfile, 'w') as f:
        f.write('%YAML:1.0\n')
        for (m, mname) in zip(mtxlist, mtxname):
            _write_matrix(f, m, mname)

    return yamlfile
def mdlist(m, n)

Preallocate 2D list of size MxN

Expand source code Browse git
def mdlist(m, n):
    """Preallocate 2D list of size MxN"""
    return [[None] * n for i in range(m)]
def mediaextension(filename)

Return '.mp4' for filename='/a/b/c.mp4'

Expand source code Browse git
def mediaextension(filename):
    """Return '.mp4' for filename='/a/b/c.mp4'"""
    return fileext(filename, multidot=False)
def mergedict(d1, d2)

Combine keys of two dictionaries and return a dictionary deep copy.

d1 = {1:2}
d2 = {3:4}
d3 = mergedict(d1,d2)
assert d3 == {1:2, 3:4}
Expand source code Browse git
def mergedict(d1, d2):
    """Combine keys of two dictionaries and return a dictionary deep copy.
    
    ```python
    d1 = {1:2}
    d2 = {3:4}
    d3 = mergedict(d1,d2)
    assert d3 == {1:2, 3:4}
    ```

    """
    assert isinstance(d1, dict) and isinstance(d2, dict)
    d = copy.deepcopy(d1)
    d.update(d2)
    return d
def minutestamp()

Return date and time string in form DDMMMYY_HHMM

Expand source code Browse git
def minutestamp():
    """Return date and time string in form DDMMMYY_HHMM"""
    return str.upper(strftime("%d%b%y_%I%M%p", localtime()))
def mktemp(ext)

Create a temporary file with extension .ext

Expand source code Browse git
def mktemp(ext):
    """Create a temporary file with extension .ext"""
    return tempfilename(suffix='.' + ext)
def most_frequent(inset, topk=1)

Return the most frequent element as determined by element equality

Expand source code Browse git
def most_frequent(inset, topk=1):
    """Return the most frequent element as determined by element equality"""
    ranked = list(countby(inset).keys())
    return ranked[-topk:] if topk is not None else ranked
def newbase(filename, base)

Convert filename=/a/b/c.ext base=d -> /a/b/d.ext

Expand source code Browse git
def newbase(filename, base):
    """Convert filename=/a/b/c.ext base=d -> /a/b/d.ext"""
    return os.path.join(filepath(filename), '%s.%s' % (base, fileext(filename, withdot=False)))
def newpath(filename, newdir)

Return /d/e/c.ext for filename /a/b/c.ext and newdir /d/e/

Expand source code Browse git
def newpath(filename, newdir):
    """Return /d/e/c.ext for filename /a/b/c.ext and newdir /d/e/"""
    (head, tail) = os.path.split(filename)
    return os.path.join(newdir, tail)
def newpathdir(filename, olddir, newdir, n=1)

Return /a/b/n/d/e.ext for filename=/a/b/c/d/e.ext, olddir=c, newdir=n

Expand source code Browse git
def newpathdir(filename, olddir, newdir, n=1):
    """Return /a/b/n/d/e.ext for filename=/a/b/c/d/e.ext, olddir=c, newdir=n"""
    p = pathlib.PurePath(filename)
    assert sum([d == olddir for d in p.parts]) == n, "Path must have exactly %s directory matches" % n
    return os.path.join(*[d.replace(olddir, newdir) for d in list(p.parts)])
def newpathroot(filename, newroot)

Return /r/b/c.ext for filename /a/b/c.ext and new root directory r

Expand source code Browse git
def newpathroot(filename, newroot):
    """Return /r/b/c.ext for filename /a/b/c.ext and new root directory r"""
    p = pathlib.PurePath(filename)
    path = list(p.parts)    
    if len(p.root) == 0:
        path[0] = newroot
    else:
        path[1] = newroot
    return os.path.join(*path)
def newprefix(filename, newprefix, depth=0)

Return /a/b/c/h/i.ext for filename /f/g/h/i.ext and prefix /a/b/c and depth=1

Expand source code Browse git
def newprefix(filename, newprefix, depth=0):
    """Return /a/b/c/h/i.ext for filename /f/g/h/i.ext and prefix /a/b/c and depth=1"""
    p = filepath(filename, depth=depth)
    return os.path.normpath(filename.replace(p, newprefix))
def noextension(filename, ext=None)

Convert filename='/path/to/myfile.ext' or filename='/path/to/myfile.ext1.ext2.ext3' to /path/to/myfile with no extension, removing the appended string past the first dot

Expand source code Browse git
def noextension(filename, ext=None):
    """Convert filename='/path/to/myfile.ext' or filename='/path/to/myfile.ext1.ext2.ext3' to /path/to/myfile with no extension, removing the appended string past the first dot"""
    return filename.split('.')[0] if ext is None else filename.replace(ext, '')
def nonecatcher(f, *args, **kwargs)

Call the function f with the provided arguments, and return (result) on success and (None) if there is any thrown exception. Useful for parallel processing

Expand source code Browse git
def nonecatcher(f, *args, **kwargs):
    """Call the function f with the provided arguments, and return (result) on success and (None) if there is any thrown exception.  Useful for parallel processing"""
    assert callable(f)
    try:
        return f(*args, **kwargs)
    except Exception as e:
        return None
def pairwise(iterable, prepad=False, postpad=False, padval=None)

Equivalent to python-3.10 itertools.pairwise.

>>> pairwise('ABCD') --> (A,B), (B,C), (C,D)
>>> pairwise('ABCD', prepad=True, padval=0) --> (0,A), (A,B), (B,C), (C,D)
>>> pairwise('ABCD', postpad=True) --> (A,B), (B,C), (C,D), (D,None)
>>> pairwise([(1,1),(2,2)], prepad=True, postpad=True, padval=(None,None)) --> [((None, None), (1, 1)), ((1, 1), (2, 2)), ((2, 2), (None, None))]
Expand source code Browse git
def pairwise(iterable, prepad=False, postpad=False, padval=None):
    """Equivalent to python-3.10 itertools.pairwise. 
    
    >>> pairwise('ABCD') --> (A,B), (B,C), (C,D)
    >>> pairwise('ABCD', prepad=True, padval=0) --> (0,A), (A,B), (B,C), (C,D)
    >>> pairwise('ABCD', postpad=True) --> (A,B), (B,C), (C,D), (D,None)
    >>> pairwise([(1,1),(2,2)], prepad=True, postpad=True, padval=(None,None)) --> [((None, None), (1, 1)), ((1, 1), (2, 2)), ((2, 2), (None, None))]
    """
    
    a, b = tee(iterable, 2)
    if prepad:
        a = chain([padval], a)
    else:
        b0 = next(b, None)
    if postpad:
        b = chain(b, [padval])
    return zip(a, b)
def permutelist(inlist, deterministic=False, seed=42)

randomly permute list order. Permutation is deterministic (same permutation on multiple calls) if specified

Expand source code Browse git
def permutelist(inlist, deterministic=False, seed=42):
    """randomly permute list order.  Permutation is deterministic (same permutation on multiple calls) if specified"""
    if deterministic:
        np.random.seed(seed)  # deterministic        
    outlist = [inlist[k] for k in np.random.permutation(list(range(0, len(inlist))))]
    if deterministic:
        np.random.seed()  # re-init randomness
    return outlist
def pklbz2(filename, obj=None)

Read/Write a bz2 compressed pickle file

Expand source code Browse git
def pklbz2(filename, obj=None):
    """Read/Write a bz2 compressed pickle file"""
    assert filename[-8:] == '.pkl.bz2', "Invalid filename - must be '*.pkl.bz2'"
    if obj is not None:
        f = bz2.BZ2File(filename, 'wb')
        cPickle.dump(obj, f)
        f.close()
        return filename
    else:
        f = bz2.BZ2File(filename, 'rb')
        obj = cPickle.load(f)
        f.close()
        return obj
def pklist(indir)

Return a list of absolute paths of *.pk files in current directory

Expand source code Browse git
def pklist(indir):
    """Return a list of absolute paths of *.pk files in current directory"""
    return listpkl(indir)
def premkdir(filename)

pre-create directory /path/to/subdir using remkdir() if it does not exist for outfile=/path/to/subdir/file.ext, and return filename

Expand source code Browse git
def premkdir(filename):
    """pre-create directory /path/to/subdir using `vipy.util.remkdir` if it does not exist for outfile=/path/to/subdir/file.ext, and return filename"""
    remkdir(filepath(filename))
    return filename
def print_and_return(x)
Expand source code Browse git
def print_and_return(x):
    print(x)
    return x
def readcsv(infile, separator=',', ignoreheader=False, comment=None, ignore_header=False)

Read a csv file into a list of lists, ignore any rows prepended with comment symbol, ignore first row if ignoreheader=True

Args

infile
the csv file input
separator
a string specifying the separator between columns. defaults to ','
ignoreheader
if true, ignore the first row of the csv file
ignore_header
if true, ignore the first row of the csv file (argument synonym)
comment
if provided, ignore all rows with this comment symbol prepended

Returns

a list of lists, each list element containing a list of elements in the corresponding line of the csv file, parsed by separator

Expand source code Browse git
def readcsv(infile, separator=',', ignoreheader=False, comment=None, ignore_header=False):
    """Read a csv file into a list of lists, ignore any rows prepended with comment symbol, ignore first row if ignoreheader=True

    Args:
        infile: the csv file input
        separator: a string specifying the separator between columns.  defaults to ','
        ignoreheader: if true, ignore the first row of the csv file
        ignore_header: if true, ignore the first row of the csv file (argument synonym)
        comment:  if provided, ignore all rows with this comment symbol prepended

    Returns:
        a list of lists, each list element containing a list of elements in the corresponding line of the csv file, parsed by separator
    """

    with open(infile, 'r') as f:
        list_of_rows = [[x.strip() for x in r.split(separator)]
                        for r in f.readlines()]
    list_of_rows = list_of_rows if (len(list_of_rows)==0 or not (ignoreheader or ignore_header)) else list_of_rows[1:]
    list_of_rows = list_of_rows if comment is None else [r for r in list_of_rows if len(r)==0 or r[0][0] != comment]
    return list_of_rows
def readcsvwithheader(infile, separator=',')

Read a csv file into a list of lists

Expand source code Browse git
def readcsvwithheader(infile, separator=','):
    """Read a csv file into a list of lists"""
    with open(infile, 'r') as f:
        list_of_rows = [[x.strip() for x in r.split(separator)]
                        for r in f.readlines()]
    header_dict = dict()
    for i in range(len(list_of_rows[0])):
        header_dict[list_of_rows[0][i]] = i
    return list_of_rows[1:], header_dict
def readjson(jsonfile, strict=True)

Read jsonfile=/path/to/file.json and return the json parsed object, issue warning if jsonfile does not have .json extension and strict=True

Expand source code Browse git
def readjson(jsonfile, strict=True):
    """Read jsonfile=/path/to/file.json and return the json parsed object, issue warning if jsonfile does not have .json extension and strict=True"""
    if not isjsonfile(jsonfile) and strict:
        warnings.warn('Attempting to read JSON file "%s" without .json extension' % jsonfile)
    with open(jsonfile) as f:
        data = json.loads(f.read())
    return data
def readlist(infile)

Read each row of file as an element of the list

Expand source code Browse git
def readlist(infile):
    """Read each row of file as an element of the list"""
    with open(infile, 'r') as f:
        list_of_rows = [r.strip() for r in f.readlines()]
    return list_of_rows
def readtxt(infile)

Read a text file one string per row

Expand source code Browse git
def readtxt(infile):
    """Read a text file one string per row"""
    return readlist(infile)
def readyaml(yamlfile)

Read a yaml file and return a parsed dictionary, this is slow for large yaml files

Expand source code Browse git
def readyaml(yamlfile):
    """Read a yaml file and return a parsed dictionary, this is slow for large yaml files"""
    try_import('yaml', 'pyyaml')
    import yaml
    with open(yamlfile, 'r') as f:
        return yaml.load(f.read(), Loader=yaml.Loader)  # yaml.CLoader is faster, but not installed via pip
def remkdir(path, flush=False)

Create a given directory if not already exists

Expand source code Browse git
def remkdir(path, flush=False):
    """Create a given directory if not already exists"""
    if os.path.isdir(path) is False and len(path) > 0:
        os.makedirs(path)
    elif flush is True:
        shutil.rmtree(path)
        os.makedirs(path)
    return os.path.abspath(os.path.expanduser(path))
def repath(v, srcpath, dstpath)

Change the filename with prefix srcpath to dstpath, for any element in v that supports the filename() api

Expand source code Browse git
def repath(v, srcpath, dstpath):
    """Change the filename with prefix srcpath to dstpath, for any element in v that supports the filename() api"""
    if not islist(v) and (hasattr(v, 'filename') and hasattr(v, 'clone')):
        vc = v.filename( v.filename().replace(os.path.normpath(srcpath), os.path.normpath(dstpath))) if v.filename() is not None else v
    elif islist(v) and all([(hasattr(vv, 'filename') and hasattr(vv, 'clone')) for vv in v]):
        vc = [vv.filename( vv.filename().replace(os.path.normpath(srcpath), os.path.normpath(dstpath))) if vv.filename() is not None else vv for vv in v ]
    elif isstring(v):
        vc = v.replace(os.path.normpath(srcpath), os.path.normpath(dstpath))
    else:
        raise ValueError('Input must be a singleton or list of vipy.image.Image() or vipy.video.Video() objects, not type "%s"' % (str(type(v))))
    return vc
def rermdir(path)

Recursively delete a given directory (if exists), and remake it

Expand source code Browse git
def rermdir(path):
    """Recursively delete a given directory (if exists), and remake it"""
    return remkdir(path, flush=True)
def rgb2bgr(im_rgb)

same as bgr2rgb

Expand source code Browse git
def rgb2bgr(im_rgb):
    """same as bgr2rgb"""
    return bgr2rgb(im_rgb)
def rmdir(indir)

Recursively remove directory and all contents (if the directory exists)

Expand source code Browse git
def rmdir(indir):
    """Recursively remove directory and all contents (if the directory exists)"""
    if os.path.exists(indir) and os.path.isdir(indir):
        shutil.rmtree(indir)
    return indir
def save(vars, outfile=None, backup=False)

Save variables to an archive file.

This function allows vipy objects to be serialized to disk for later loading.

im = vipy.image.owl()
im = vipy.util.load(vipy.util.save(im))   # round trip

Args

vars
A python object to save. This can be any serializable python object
outfile
An output file to save. Must have extension [.pkl, .json, .pkl.bz2]. If None, will save to a temporary JSON file.

backup [bool]: If true and the outfile already exists, make a copy and save as outfile.bak before overwriting Returns A path to the saved archive file. Load using load().

Note: JSON is preferred as an archive format for vipy. Be sure to install the excellent ultrajson library (pip install ujson) for fast serialization.

Expand source code Browse git
def save(vars, outfile=None, backup=False):
    """Save variables to an archive file.

    This function allows vipy objects to be serialized to disk for later loading.

    ```python
    im = vipy.image.owl()
    im = vipy.util.load(vipy.util.save(im))   # round trip
    ```

    Args:
        vars: A python object to save.  This can be any serializable python object
        outfile:  An output file to save.  Must have extension [.pkl, .json, .pkl.bz2].  If None, will save to a temporary JSON file.
        backup [bool]:  If true and the outfile already exists, make a copy and save as outfile.bak before overwriting
    Returns
        A path to the saved archive file.  Load using `vipy.util.load`. 

    .. note:: JSON is preferred as an archive format for vipy.  Be sure to install the excellent ultrajson library (pip install ujson) for fast serialization.
    """
    allowable = set(['.pkl', '.json', '.pkl.bz2'])
    outfile = tempjson() if outfile is None else outfile

    if backup and os.path.exists(outfile):
        shutil.copyfile(outfile, outfile+'.bak')
    remkdir(filepath(outfile))
    if ispkl(outfile):
        dill.dump(vars, open(outfile, 'wb'))

    elif isjsonfile(outfile):
        saveobj = vars
        registry = class_registry()
        if isinstance(saveobj, list) and all([str(type(d)) in registry for d in saveobj]):
            j = [{str(type(d)):d.json(encode=False)} for d in saveobj] if isinstance(saveobj, list) else ({str(type(d)):d.json(encode=False)} for d in saveobj)
        elif str(type(saveobj)) in registry:
            j = {str(type(saveobj)):saveobj.json(encode=False)}
        else:
            j = saveobj

        s = json.dumps(j, ensure_ascii=False)  # load to memory (faster than json.dump), will throw exception if it cannot serialize
        with open(outfile, 'w') as f:
            f.write(s)            

    elif ispklbz2(outfile):
        return bz2pkl(outfile, vars)
    else:
        raise ValueError('Unknown file extension for save file "%s" - must be in %s' % (fileext(outfile), str(allowable)))
    
    return os.path.abspath(outfile)
def save_opencv_yaml(yamlfile, mat)

Save a numpy array to YAML file importable by OpenCV

Expand source code Browse git
def save_opencv_yaml(yamlfile, mat):
    """Save a numpy array to YAML file importable by OpenCV"""

    def _write_matrix(f, M):
        f.write('    mtx_01: !!opencv-matrix\n')
        f.write('       rows: %d\n' % M.shape[0])
        f.write('       cols: %d\n' % (M.shape[1] if M.ndim == 2 else 1))
        f.write('       dt: f\n')
        f.write('       data: [ ')
        datastr = ''
        for (k, x) in enumerate(M.flatten()):
            datastr += '%.6e' % x
            if (k + 1 == M.size):
                f.write(datastr)
                break
            datastr += ', '
            if ((k + 1) % 4) == 0:
                f.write(datastr + '\n           ')
                datastr = ''

        f.write(']\n')

    with open(yamlfile, 'w') as f:
        f.write('%YAML:1.0\n')
        _write_matrix(f, mat)

    return yamlfile
def savetemp(img)
Expand source code Browse git
def savetemp(img):
    f = '/tmp/%s.png' % uuid.uuid1().hex
    PIL.Image.fromarray(img.astype(np.uint8)).save(f)
    return f
def scpload(url)

Load an archive file saved using scpsave()

Expand source code Browse git
def scpload(url):
    """Load an archive file saved using `vipy.util.scpsave`"""
    import vipy.downloader
    return load(vipy.downloader.scp(url, templike(url)))
def scpsave(V, username=None)

Save an archive file to load via SCP.

Use case:

  • This archive format is useful to allow access to videos and images that are accessible behind a remote server for which you have access via SSH key-based authentication.
  • You create this archive on the remote server, and all vipy objects are replaced with references to remote media.
  • Every video or image is replaced with a URL of the format 'scp://USER@HOST:/path/to.mp4'.
  • Vipy will use your SSH keys to SCP these media files from USER@HOST on demand, so that the videos are cached for you on your local machine when you need them.
  • This is useful for transparently visualizing large datasets that are hidden behind an SSH-only accessible server

Usage:

outfile = vipy.util.scpsave([vipy.video.Video(filename='/path/to.mp4)])  # run on remote machine that you have SSH key access
V = vipy.util.scpload(outfile)  # run on local machine that has SSH key access to remote machine
V[0].load()  # this will SCP the videos from 'scp:///path/to.mp4' to $VIPY_CACHE/to.mp4 transparently and on demand

Args

V
[vipy objects] A list of vipy objects or Dataset
username
[str] Your username on the remote machine to select the proper SSH key

Returns

A temp archive file stored on the remote machine that will be downloaded and loaded via SCP, such that each element in the list will be fetched via scp when pixels are loaded.

Expand source code Browse git
def scpsave(V, username=None):
    """Save an archive file to load via SCP.

    Use case:

    - This archive format is useful to allow access to videos and images that are accessible behind a remote server for which you have access via SSH key-based authentication.
    - You create this archive on the remote server, and all vipy objects are replaced with references to remote media.
    - Every video or image is replaced with a URL of the format 'scp://USER@HOST:/path/to.mp4'.  
    - Vipy will use your SSH keys to SCP these media files from USER@HOST on demand, so that the videos are cached for you on your local machine when you need them.
    - This is useful for transparently visualizing large datasets that are hidden behind an SSH-only accessible server

    Usage:
    
    ```python
    outfile = vipy.util.scpsave([vipy.video.Video(filename='/path/to.mp4)])  # run on remote machine that you have SSH key access
    V = vipy.util.scpload(outfile)  # run on local machine that has SSH key access to remote machine
    V[0].load()  # this will SCP the videos from 'scp:///path/to.mp4' to $VIPY_CACHE/to.mp4 transparently and on demand
    ```

    Args:
        V: [vipy objects] A list of vipy objects or `vipy.dataset.Dataset`
        username: [str] Your username on the remote machine to select the proper SSH key

    Returns:
        A temp archive file stored on the remote machine that will be downloaded and loaded via SCP, such that each element in the list will be fetched via scp when pixels are loaded.

    """
    
    import vipy.image
    import vipy.video

    if isinstance(V, vipy.dataset.Dataset) and V._isvipy():
        v = V.localmap(lambda v: v.clone().url('scp://%s%s:%s' % (('%s@' % username) if username is not None else '', socket.gethostname(), v.filename())).nofilename())
    elif (isinstance(V, vipy.image.Image) or isinstance(V, vipy.video.Video)) and V.hasfilename():        
        v = V.clone().url('scp://%s%s:%s' % (('%s@' % username) if username is not None else '', socket.gethostname(), V.filename())).nofilename()
    elif islist(V) and all([isinstance(v, vipy.image.Image) or isinstance(v, vipy.video.Video) for v in V]):
        v = [v.clone().url('scp://%s%s:%s' % (('%s@' % username) if username is not None else '', socket.gethostname(), v.abspath().filename())).nofilename() for v in V]
    else:
        v = V # no vipy objects

    pklfile = 'scp://%s%s:%s' % (('%s@' % username) if username is not None else '', socket.gethostname(), save(v, temppkl()))
    cmd = "V = vipy.util.scpload('%s')" % pklfile
    print('[vipy.util.scpsave]: On a local machine where you have public key ssh access to this remote machine run:\n>>> %s\n' % cmd)
    return pklfile
def seconds_to_MMSS_colon_notation(sec)

Convert integer seconds into MM:SS colon format. If sec=121, then return '02:01'.

Expand source code Browse git
def seconds_to_MMSS_colon_notation(sec):
    """Convert integer seconds into MM:SS colon format.  If sec=121, then return '02:01'. """
    assert isinstance(sec, int) and sec <= 99*60 + 59 and sec >= 0
    return '%02d:%02d' % (int(sec/60.0), sec % 60)
def seq(start, stop, step=1)

Equivalent to matlab [start:step:stop]

Expand source code Browse git
def seq(start, stop, step=1):
    """Equivalent to matlab [start:step:stop]"""
    n = int(round((stop - start) / float(step)))
    if n > 1:
        return([start + step * i for i in range(n + 1)])
    else:
        return([])
def shortuuid(n=16)

Generate a short UUID with n hex digits

Expand source code Browse git
def shortuuid(n=16):
    """Generate a short UUID with n hex digits"""
    return hashlib.sha256(uuid.uuid1().hex.encode('utf-8')).hexdigest()[0:n] 
def softmax(x, temperature=1.0)

Row-wise softmax

Expand source code Browse git
def softmax(x, temperature=1.0):
    """Row-wise softmax"""
    assert x.ndim == 2
    z = np.exp((x - np.max(x, axis=1).reshape(x.shape[0], 1)) / temperature)
    return z / np.sum(z, axis=1).reshape(x.shape[0], 1)
def splitext(filename)

Given /a/b/c.ext return tuple of strings ('/a/b/c', '.ext'), handling multi-dot extensions like .tar.gz

Expand source code Browse git
def splitext(filename):
    """Given /a/b/c.ext return tuple of strings ('/a/b/c', '.ext'), handling multi-dot extensions like .tar.gz"""
    (head, tail) = os.path.split(filename)
    ext = fileext(filename, multidot=True, withdot=True)
    base = tail.replace(ext,'') if ext is not None else tail
    return (os.path.join(head, base), ext)  # for consistency with splitext
def string_to_pil_interpolation(interp)

Internal function to convert interp string to interp object

Expand source code Browse git
def string_to_pil_interpolation(interp):
    """Internal function to convert interp string to interp object"""
    assert interp in ['bilinear', 'bicubic', 'nearest'], "Invalid interp - Must be in ['bilinear', 'bicubic', 'nearest']"
    if interp == 'bilinear':
        return PIL.Image.BILINEAR
    elif interp == 'bicubic':
        return PIL.Image.BICUBIC
    elif interp == 'nearest':
        return PIL.Image.NEAREST
    else:
        raise  # should never get here
def stringhash(s, n=16)

Generate a repeatable hash with n characters for a string s

Expand source code Browse git
def stringhash(s, n=16):
    """Generate a repeatable hash with n characters for a string s"""
    return hashlib.sha256(s.encode('utf-8')).hexdigest()[0:n]

Create a symlink from src to dst, overwriting the existing symlink at dst if overwrite=True

Expand source code Browse git
def symlink(src, dst, overwrite=False):
    """Create a symlink from src to dst, overwriting the existing symlink at dst if overwrite=True"""
    if overwrite and os.path.islink(dst):
        os.unlink(dst)
    os.symlink(src, dst)
    return dst
def take(inlist, k)

Take k elements at random from inlist

Expand source code Browse git
def take(inlist, k):
    """Take k elements at random from inlist"""
    return [inlist[i] for i in np.random.permutation(range(len(inlist)))[0:k]] if len(inlist)>k else inlist
def takelast(inlist)

Take last element from inlist or return None if empty

Expand source code Browse git
def takelast(inlist):
    """Take last element from inlist or return None if empty"""
    return tolist(inlist)[-1] if len(tolist(inlist))>=1 else None
def takeone(inlist)

Take one element at random from inlist or return None if empty

Expand source code Browse git
def takeone(inlist):
    """Take one element at random from inlist or return None if empty"""
    return take(list(inlist), k=1)[0] if len(inlist)>=1 else None
def tempMP4()

Create a temporary MP4 file in system temp directory

Expand source code Browse git
def tempMP4():
    """Create a temporary MP4 file in system temp directory"""
    return tempfilename(suffix='.mp4')
def tempWEBP()

Create a temporary WEBP file in system temp directory

Expand source code Browse git
def tempWEBP():
    """Create a temporary WEBP file in system temp directory"""
    return tempfilename(suffix='.webp')
def tempcsv()

Create a temporary CSV file

Expand source code Browse git
def tempcsv():
    """Create a temporary CSV file"""
    return tempfilename(suffix='.csv')
def tempdir()

Wrapper around tempfile, because I can never remember the syntax

Expand source code Browse git
def tempdir():
    """Wrapper around tempfile, because I can never remember the syntax"""
    return tempfile.gettempdir()
def tempfilename(suffix)

Create a temporary filename $TEMPDIR/$UUID.suffix, suffix should include the dot such as suffix='.jpg',

Expand source code Browse git
def tempfilename(suffix):
    """Create a temporary filename $TEMPDIR/$UUID.suffix, suffix should include the dot such as suffix='.jpg', """
    return os.path.join(tempfile.gettempdir(), '%s%s' % (shortuuid(), suffix))
def temphtml()

Create a temporary HTMLfile

Expand source code Browse git
def temphtml():
    """Create a temporary HTMLfile"""
    return tempfilename(suffix='.html')
def tempimage(ext='jpg')

Create a temporary image with the given extension

Expand source code Browse git
def tempimage(ext='jpg'):
    """Create a temporary image with the given extension"""
    if ext[0] == '.':
        ext = ext[1:]
    return tempfilename(suffix='.' + ext)
def tempjpg()

Create a temporary JPG file in system temp directory

Expand source code Browse git
def tempjpg():
    """Create a temporary JPG file in system temp directory"""
    return tempimage('jpg')
def tempjson()

Create a temporary JSON file

Expand source code Browse git
def tempjson():
    """Create a temporary JSON file"""
    return tempfilename(suffix='.json')
def templike(filename)

Create a new temporary filename with the same extension as filename

Expand source code Browse git
def templike(filename):
    """Create a new temporary filename with the same extension as filename"""
    return tempfilename(fileext(filename))
def temppdf()

Create a temporary PDF file

Expand source code Browse git
def temppdf():
    """Create a temporary PDF file"""
    return tempfilename(suffix='.pdf')
def temppickle()

Create a temporary pickle file

Expand source code Browse git
def temppickle():
    """Create a temporary pickle file"""
    return tempfilename(suffix='.pkl')
def temppkl()

Create a temporary pickle file

Expand source code Browse git
def temppkl():
    """Create a temporary pickle file"""
    return temppickle()
def temppng()

Create a temporay PNG file

Expand source code Browse git
def temppng():
    """Create a temporay PNG file"""
    return tempimage('png')
def tempyaml()

Create a temporary YAML file

Expand source code Browse git
def tempyaml():
    """Create a temporary YAML file"""
    return tempfilename(suffix='.yml')
def timestamp()

Return date and time string in form DDMMMYY_HHMMSS

Expand source code Browse git
def timestamp():
    """Return date and time string in form DDMMMYY_HHMMSS"""
    return str.upper(strftime("%d%b%y_%I%M%S%p", localtime()))
def tmpjpg()

Create a temporary JPG file in /tmp

Expand source code Browse git
def tmpjpg():
    """Create a temporary JPG file in /tmp"""
    return '/tmp/%s.jpg' % uuid.uuid4().hex
def tocache(filename)

If the VIPY_CACHE environment variable is set, then return the filename=/path/to/file.ext in the cache as VIPY_CACHE/file.ext. Otherwise, return the file in the system temp

Expand source code Browse git
def tocache(filename):
    """If the VIPY_CACHE environment variable is set, then return the filename=/path/to/file.ext in the cache as VIPY_CACHE/file.ext.  Otherwise, return the file in the system temp"""
    return os.path.join(remkdir(os.environ['VIPY_CACHE']) if hascache() else tempdir(), filetail(filename))
def toextension(filename, newext)

Convert filename='/path/to/myfile.ext' to /path/to/myfile.xyz, such that newext='xyz' or newext='.xyz'

Expand source code Browse git
def toextension(filename, newext):
    """Convert filename='/path/to/myfile.ext' to /path/to/myfile.xyz, such that newext='xyz' or newext='.xyz'"""
    if '.' in newext:
        newext = newext.split('.')[-1]
    (filename, oldext) = splitext(filename)
    return filename + '.' + str(newext)
def tofilename(s, hyphen=True)

Convert arbitrary string to valid filename with underscores replacing invalid chars

Expand source code Browse git
def tofilename(s, hyphen=True):
    """Convert arbitrary string to valid filename with underscores
    replacing invalid chars"""
    valid_chars = "-_.%s%s" % (str.ascii_letters, str.digits)
    s = str.replace(s, ' ', '_')
    if hyphen:
        s = str.replace(s, '-', '_')
    return "".join(x for x in s if x in valid_chars)
def tolist(x)

Convert a python tuple or singleton object to a list if not already a list

Expand source code Browse git
def tolist(x):
    """Convert a python tuple or singleton object to a list if not already a list """
    if isinstance(x, list):
        return x
    elif isinstance(x, tuple):        
        return list(x)
    elif isinstance(x, set):        
        return list(x)
    else:
        return [x]
def tolist_or_singleton(x)

Return list(x) if length of iterator x is not equal to one, else return x or None. This is useful to return single elements instead of single element lists.

Expand source code Browse git
def tolist_or_singleton(x):
    """Return list(x) if length of iterator x is not equal to one, else return x or None.  This is useful to return single elements instead of single element lists."""
    y = tolist(x)
    return y if len(y)>1 else (y[0] if len(y)==1 else None)
def topath(filename, newdir)

Alias for newpath()

Expand source code Browse git
def topath(filename, newdir):
    """Alias for `vipy.util.newpath`"""
    return newpath(filename, newdir)
def topkl(filename)

Convert filename='/path/to/myfile.ext' to /path/to/myfile.pkl

Expand source code Browse git
def topkl(filename):
    """Convert filename='/path/to/myfile.ext' to /path/to/myfile.pkl"""
    return toextension(filename, '.pkl')
def toset(x)

Convert a python iterable to a set of not already a set

Expand source code Browse git
def toset(x):
    """Convert a python iterable to a set of not already a set"""
    if isinstance(x, set):        
        return x    
    elif isinstance(x, list) or isinstance(x, tuple):
        return set(x)
    else:
        return set([x])
def totempdir(filename)

Convert a filename '/patj/to/filename.ext' to '/tempdir/filename.ext'

Expand source code Browse git
def totempdir(filename):
    """Convert a filename '/patj/to/filename.ext' to '/tempdir/filename.ext'"""
    return os.path.join(tempfile.gettempdir(), filetail(filename))
def touch(filename, mystr='')

Create an empty file containing mystr

Expand source code Browse git
def touch(filename, mystr=''):
    """Create an empty file containing mystr"""
    f = open(filename, 'w')
    f.write(str(mystr))
    f.close()
def try_import(package, pipname=None, message=None)

Show a helpful error message for missing optional packages

Expand source code Browse git
def try_import(package, pipname=None, message=None):
    """Show a helpful error message for missing optional packages"""
    try:
        importlib.import_module(package)
    except:
        if message is not None:
            raise ImportError(message)
        else:
            raise ImportError('Optional package "%s" not installed -  Run "pip install %s" or "pip install vipy[all]" ' % (package, package if pipname is None else pipname))
def trycatcher(f, *args, **kwargs)

Call the function f with the provided arguments, and return (result) on success and (None) if there is any thrown exception. Useful for parallel processing

Expand source code Browse git
def trycatcher(f, *args, **kwargs):
    """Call the function f with the provided arguments, and return (result) on success and (None) if there is any thrown exception.  Useful for parallel processing"""
    assert callable(f)
    try:
        return f(*args, **kwargs)
    except Exception as e:
        return None
def tryload(infile, abspath=False)

Attempt to load a pkl file, and return the value if successful and None if not

Expand source code Browse git
def tryload(infile, abspath=False):
    """Attempt to load a pkl file, and return the value if successful and None if not"""
    try:
        return load(infile, abspath=abspath)
    except:
        return None
def txtlist(imdir)

Return a list of absolute paths of *.txt files in current directory

Expand source code Browse git
def txtlist(imdir):
    """Return a list of absolute paths of *.txt files in current directory"""
    return [os.path.join(imdir, item) for item in os.listdir(imdir)
            if istextfile(item) and not is_hiddenfile(item)]
def videolist(videodir)

return list of videos with absolute path in a directory

Expand source code Browse git
def videolist(videodir):
    """return list of videos with absolute path in a directory"""
    return [os.path.abspath(os.path.join(videodir, item))
            for item in os.listdir(videodir)
            if (isvideo(item) and not is_hiddenfile(item))]
def vipy_groupby(inset, keyfunc)

groupby on unsorted inset

Expand source code Browse git
def vipy_groupby(inset, keyfunc):
    """groupby on unsorted inset"""
    return groupby(inset, keyfunc)
def writecsv(list_of_tuples, outfile=None, mode='w', separator=',', header=None, comment='# ')

Write list of tuples to an output csv file with each list element on a row and tuple elements separated by commas.

Examples:

vipy.util.writecsv([(1,2,3), (4,5,6)], '/tmp/out.csv')
vipy.util.writecsv([(1,2,3), (4,5,6)], '/tmp/out.csv', separator=';'))
vipy.util.writecsv([(1,2,3), (4,5,6)], '/tmp/out.csv', header=('h1','h2','h3'))

Args

list_of_tuples
a list of tuples each tuple is a row
outfile
the csv file output
mode
'w' for overwrite, 'a' for append
separator
a string specifying the separator between columns. defaults to ','
header
a tuple containing strings to be appended to the first row of the csv file
comment
the comment symbol to be prepended to the header row

Returns

the outfile path

Expand source code Browse git
def writecsv(list_of_tuples, outfile=None, mode='w', separator=',', header=None, comment='# '):
    """Write list of tuples to an output csv file with each list element on a row and tuple elements separated by commas.

    Examples:
    ```python
    vipy.util.writecsv([(1,2,3), (4,5,6)], '/tmp/out.csv')
    vipy.util.writecsv([(1,2,3), (4,5,6)], '/tmp/out.csv', separator=';'))
    vipy.util.writecsv([(1,2,3), (4,5,6)], '/tmp/out.csv', header=('h1','h2','h3'))
    ```

    Args:
        list_of_tuples: a list of tuples each tuple is a row
        outfile: the csv file output
        mode: 'w' for overwrite, 'a' for append
        separator: a string specifying the separator between columns.  defaults to ','
        header: a tuple containing strings to be appended to the first row of the csv file
        comment:  the comment symbol to be prepended to the header row 

    Returns:
        the outfile path
    """
    
    list_of_tuples = list_of_tuples if not isnumpy(list_of_tuples) else list_of_tuples.tolist()
    list_of_tuples = list_of_tuples if header is None else [tuple([h if k>0 else comment+h for (k,h) in enumerate(header)])]+list_of_tuples  # prepend header with comment symbol
    outfile = os.path.abspath(os.path.expanduser(outfile)) if outfile is not None else tempcsv()
    with open(outfile, mode) as f:
        for u in list_of_tuples:
            n = len(u)
            for (k, v) in enumerate(u):
                if (k + 1) < n:
                    f.write(str(v) + separator)
                else:
                    f.write(str(v) + '\n')
    return(outfile)
def writejson(d, outfile)
Expand source code Browse git
def writejson(d, outfile):
    with open(outfile, 'w') as f:
        json.dump(d, f)
    return outfile
def writelist(mylist, outfile, mode='w')

Write list of strings to an output file with each row an element of the list

Expand source code Browse git
def writelist(mylist, outfile, mode='w'):
    """Write list of strings to an output file with each row an element of
    the list"""
    outfile = os.path.abspath(os.path.expanduser(outfile))
    with open(outfile, mode) as f:
        for s in mylist:
            f.write(str(s) + '\n')
    return(outfile)

Classes

class Failed (*args, **kwargs)

Raised when unit test fails to throw an exception

Expand source code Browse git
class Failed(Exception):
    """Raised when unit test fails to throw an exception"""
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class Stopwatch

Return elapsed system time in seconds between calls to enter and exit

Expand source code Browse git
class Stopwatch(object):
    """Return elapsed system time in seconds between calls to enter and exit"""

    def __init__(self):
        self.reset()

    def __enter__(self):
        self.start = time.time()
        self.last = self.start
        return self

    def __exit__(self, *args):
        self.end = time.time()
        self.elapsed = self.end - self.start

    def since(self, start=False):
        """Return seconds since start or last call to this method"""
        now = time.time()
        dur = now - self.start if start is True else now - self.last
        self.last = now
        return dur

    def reset(self):
        self.start = time.time()
        self.last = self.start
        return self

    def duration(self):
        """Time in seconds since last reset"""
        return time.time() - self.start

Methods

def duration(self)

Time in seconds since last reset

Expand source code Browse git
def duration(self):
    """Time in seconds since last reset"""
    return time.time() - self.start
def reset(self)
Expand source code Browse git
def reset(self):
    self.start = time.time()
    self.last = self.start
    return self
def since(self, start=False)

Return seconds since start or last call to this method

Expand source code Browse git
def since(self, start=False):
    """Return seconds since start or last call to this method"""
    now = time.time()
    dur = now - self.start if start is True else now - self.last
    self.last = now
    return dur
class Timer (sprintf_next=None, sprintf_first=None)

Pretty print elapsed system time in seconds between calls to enter and exit

   t = Timer():
   [some code]
   print(t)
   [some more code]
   print(t)

   with Timer():
      [some code]
Expand source code Browse git
class Timer(object):
    """Pretty print elapsed system time in seconds between calls to enter and exit
    
    ```python 
       t = Timer():
       [some code]
       print(t)
       [some more code]
       print(t)

       with Timer():
          [some code]
    ```
       
    """
    def __enter__(self):
        self._begin = time.time()
        self._last = self._begin
        return self
        
    def __exit__(self, *args):
        print(self.__repr__())

    def __init__(self, sprintf_next=None, sprintf_first=None):
        self._sprintf_next = '[vipy.util.timer]: elapsed=%1.6fs, total=%1.6fs' if sprintf_next is None else sprintf_next
        self._sprintf_first = '[vipy.util.timer]: elapsed=%1.6fs' if sprintf_first is None else sprintf_first
        self._begin = time.time()
        self._last = self._begin
        self._laps = 0        
        try:
            self._sprintf_next % (1.0, 1.0)
            self._sprintf_first % (1.0)            
        except:
            raise ValueError('Printed display string must be a sprintf style string with one or two number variable like "Elapsed=%1.6f since=%1.6f"')
            
    def __repr__(self):
        s = str(self._sprintf_next % (time.time() - self._last, (time.time() - self._begin))) if self._laps > 0 else str(self._sprintf_first % (time.time() - self._begin))
        self._last = time.time()
        self._laps += 1
        return s