Module vipy.batch

Expand source code Browse git
import os
import sys
from vipy.util import try_import, islist, tolist, tempdir, remkdir, chunklistbysize, listpkl, filetail, filebase, tempdir
from itertools import repeat
import dill
dill.extend(False)  # https://github.com/uqfoundation/dill/issues/383
try_import('dask', 'dask distributed')
from dask.distributed import as_completed, wait
try_import('torch', 'torch');  import torch
import dask
import dask.config
from dask.distributed import Client
from dask.distributed import as_completed, wait
from dask.distributed import get_worker         
import numpy as np
import tempfile
import warnings
import vipy.globals
import hashlib
import uuid
import shutil
dill.extend(True)  # https://github.com/uqfoundation/dill/issues/383


class Dask(object):
    """Dask distributed client"""

    def __init__(self, num_processes=None, dashboard=False, verbose=False, address=None, num_gpus=None):
        assert address is not None or num_processes is not None or num_gpus is not None, "Invalid input"

        if num_gpus is not None:
            assert num_processes is None, "Cannot specify both num_gpus and num_processes"
            num_processes = num_gpus   # coercing

        self._num_processes = num_processes

        os.environ['DASK_LOGGING__DISTRIBUTED'] = 'warning' if not verbose else 'info'
        os.environ['DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT'] = "30s"
        os.environ['DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP'] = "30s"
        os.environ['DASK_DISTRIBUTED__DEPLOY__LOST_WORKER_TIMEOUT'] = "30s"

        dask.config.set({'DISTRIBUTED.COMM.TIMEOUTS.CONNECT'.lower():os.environ['DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT']})
        dask.config.set({'DISTRIBUTED.COMM.TIMEOUTS.TCP'.lower():os.environ['DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP']})
        dask.config.set({'DISTRIBUTED.DEPLOY.LOST_WORKER_TIMEOUT'.lower():os.environ['DASK_DISTRIBUTED__DEPLOY__LOST_WORKER_TIMEOUT']})
        dask.config.refresh()

        # Worker env
        env = {'VIPY_BACKEND':'Agg',  # headless in workers
               'PYTHONOPATH':os.environ['PYTHONPATH'] if 'PYTHONPATH' in os.environ else '',
               'PATH':os.environ['PATH'] if 'PATH' in os.environ else ''}

        if 'VIPY_CACHE' in os.environ:
            env.update({'VIPY_CACHE':os.environ['VIPY_CACHE']})
        if 'VIPY_AWS_ACCESS_KEY_ID' in os.environ:
            env.update({'VIPY_AWS_ACCESS_KEY_ID':os.environ['VIPY_AWS_ACCESS_KEY_ID']})            
        if  'VIPY_AWS_SECRET_ACCESS_KEY' in os.environ:
            env.update({'VIPY_AWS_SECRET_ACCESS_KEY':os.environ['VIPY_AWS_SECRET_ACCESS_KEY']})        
                    
        for (k,v) in os.environ.items():
            if k.startswith('DASK_'):
                env[k] = v
    
        if address is not None:
            # Distributed scheduler
            self._client = Client(name='vipy', address=address)

            # Update key environment variables on remote workers using out of band function (yuck)
            # Make sure that any environment variables are accessible on all machines!  (e.g. VIPY_CACHE)
            # If not, then you need to unset these variables from os.environ prior to calling Dask()
            def _f_setenv_remote(localenv):
                import os; os.environ.update(localenv)

            localenv = {k:v for (k,v) in os.environ.items() if k.startswith('VIPY_')}
            localenv.update( {'VIPY_BACKEND':'Agg'} )
            self._client.run(lambda env=localenv: _f_setenv_remote(env))

        else:
            # Local scheduler
            self._client = Client(name='vipy',
                                  address=address,  # to connect to distributed scheduler HOSTNAME:PORT
                                  scheduler_port=0,   # random
                                  dashboard_address=None if not dashboard else ':0',  # random port
                                  processes=True, 
                                  threads_per_worker=1,
                                  n_workers=num_processes, 
                                  env=env,
                                  direct_to_workers=True, 
                                  #memory_limit='auto',
                                  #silence_logs=20 if verbose else 40, 
                                  local_directory=tempfile.mkdtemp())

        self._num_gpus = num_gpus
        if self._num_gpus is not None:
            assert isinstance(self._num_gpus, int) and self._num_gpus > 0, "Number of GPUs must be >= 0 not '%s'" % (str(self._num_gpus))
            assert self._num_gpus == self._num_processes
            wait([self._client.submit(vipy.globals.gpuindex, k, workers=wid) for (k, wid) in enumerate(self._client.scheduler_info()['workers'].keys())])


    def __repr__(self):
        if self._num_processes is not None or self._num_gpus is not None:
            # Local 
            return str('<vipy.globals.dask: %s%s>' % ('gpus=%d' % self.num_gpus() if self.num_gpus() is not None else 'processes=%d' % self.num_processes(), ', dashboard="%s"' % str(self._client.dashboard_link) if self.has_dashboard() else ''))
        elif self._client is not None:
            # Distributed
            return str('<vipy.globals.dask: %s>' % (str(self._client)))
        else:
            return str('<vipy.globals.dask: shutdown')

    def num_gpus(self):
        return self._num_gpus

    def has_dashboard(self):
        return len(self._client.dashboard_link) > 0 if self._client is not None else False

    def dashboard(self):        
        webbrowser.open(self._client.dashboard_link) if len(self._client.dashboard_link)>0 else None
    
    def num_processes(self):
        return len(self._client.nthreads()) if self._client is not None else 0

    def shutdown(self):
        self._client.close()
        self._num_processes = None
        self._num_gpus = None
        vipy.globals.GLOBAL['DASK_CLIENT'] = None        
        return self

    def client(self):
        return self._client


class Batch():
    """vipy.batch.Batch class

    This class provides a representation of a set of vipy objects.  All of the object types must be the same.  If so, then an operation on the batch is performed on each of the elements in the batch in parallel.

    Examples:

    >>> b = vipy.batch.Batch([Image(filename='img_%06d.png' % k) for k in range(0,100)])
    >>> b.map(lambda im: im.bgr())  
    >>> b.map(lambda im: np.sum(im.array())) 
    >>> b.map(lambda im, f: im.saveas(f), args=['out%d.jpg' % k for k in range(0,100)])
    
    >>> v = vipy.video.RandomSceneActivity()
    >>> b = vipy.batch.Batch(v, n_processes=16)
    >>> b.map(lambda v,k: v[k], args=[(k,) for k in range(0, len(v))])  # paralle interpolation

    >>> d = vipy.data.kinetics.Kinetics700('/path/to/kinetics').download().trainset()
    >>> b = vipy.batch.Batch(d, n_processes=32)
    >>> b.map(lambda v: v.download().save())  # will download and clip dataset in parallel

    >>> b.result()  # retrieve results after a sequence of map or filter chains
    >>> list(b)     # equivalent to b.result()

    Args:
        strict: [bool] if distributed processing fails, return None for that element and print the exception rather than raise
        as_completed: [bool] Return the objects to the scheduler as they complete, this can introduce instabilities for large complex objects, use with caution
        ordered: [bool]: If True, then preserve the order of objects in objlist in distributed processing

    """    
             
    def __init__(self, objlist, strict=False, as_completed=False, warnme=False, minscatter=None, ordered=False):
        """Create a batch of homogeneous vipy.image objects from an iterable that can be operated on with a single parallel function call
        """
        assert isinstance(objlist, list), "Input must be a list"
        self._batchtype = type(objlist[0]) if len(objlist)>0 else type(None)
        assert all([isinstance(im, self._batchtype) for im in objlist]), "Invalid input - Must be homogeneous list of the same type"                

        # Move this into map and disable using localmap
        if vipy.globals.dask() is None and warnme:
            print('[vipy.batch.Batch]: vipy.batch.Batch() is not set to use parallelism.  This is set using:\n    >>> with vipy.globals.parallel(n) for multi-processing with n processes\n    >>> vipy.globals.parallel(pct=0.8) for multiprocessing that uses a percentage of the current system resources\n    >>> vipy.globals.dask(address="SCHEDULER:PORT") which connects to a Dask distributed scheduler.\n    >>> vipy.globals.noparallel() to completely disable all parallelism.')

        # FIXME: this needs to go into Dask()
        #self._ngpu = ngpu
        #if ngpu is not None:
        #    assert isinstance(ngpu, int) and ngpu > 0, "Number of GPUs must be >= 0 not '%s'" % (str(ngpu))
        #    wait([self._client.submit(vipy.globals.gpuindex, k, workers=wid) for (k, wid) in enumerate(self._client.scheduler_info()['workers'].keys())])

        self._strict = strict
        self._as_completed = as_completed  # this may introduce instabilities for large complex objects, use with caution
        self._minscatter = minscatter
        self._ordered = ordered
        self._objlist = [(k,o) for (k,o) in enumerate(objlist)] if ordered else objlist

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        return self
    
    def __len__(self):
        return len(self._objlist)

    def __repr__(self):        
        return str('<vipy.batch: type=%s, len=%d%s>' % (str(self._batchtype), len(self), (', client=%s' % str(vipy.globals.dask())) if vipy.globals.dask() is not None else ''))

    def _client(self):
        return vipy.globals.dask()._client if vipy.globals.dask() is not None else None

    def _batch_wait(self, futures):
        try:
            results = []            
            for (k, batch) in enumerate(as_completed(futures, with_results=True, raise_errors=False).batches()):
                for (future, result) in batch:
                    if future.status != 'error':
                        results.append(result)  # not order preserving, will restore order in result()
                    else:
                        if self._strict:
                            typ, exc, tb = result
                            raise exc.with_traceback(tb)
                        else:
                            print('[vipy.batch]: future %s failed with error "%s" - SKIPPING' % (str(future), str(result)))
                        results.append(None)
                    del future, result  # distributed memory cleanup

                # Distributed memory cleanup
                del batch
  
            return results

        except KeyboardInterrupt:
            # warnings.warn('[vipy.batch]: batch cannot be restarted after killing with ctrl-c - You must create a new Batch()')
            #vipy.globals.dask().shutdown()
            #self._client = None
            return results
        except:
            # warnings.warn('[vipy.batch]: batch cannot be restarted after exception - Recreate Batch()')                
            raise
    
    def _wait(self, futures):
        assert islist(futures) and all([hasattr(f, 'result') for f in futures])
        if self._as_completed:
            return self._batch_wait(futures)        
        try:
            results = []            
            wait(futures)
            for f in futures:  
                try:
                    results.append(f.result()) 
                except:
                    if self._strict:
                        raise
                    try:
                        print('[vipy.batch]: future %s failed with error "%s" for batch "%s"' % (str(f), str(f.exception()), str(self)))
                    except:
                        print('[vipy.batch]: future failed')
                    results.append(None)
            return results

        except KeyboardInterrupt:
            # warnings.warn('[vipy.batch]: batch cannot be restarted after killing with ctrl-c - You must create a new Batch()')
            #vipy.globals.dask().shutdown()
            #self._client = None
            return None  
        except:
            # warnings.warn('[vipy.batch]: batch cannot be restarted after exception - Recreate Batch()')                
            raise

    def result(self):
        """Return the result of the batch processing, ordered"""
        if self._ordered:
            objlist = {int(v[0]):v[1] for v in self._objlist if v is not None}
            return [objlist[k] if k in objlist else None for k in range(len(self._objlist))]  # restore order
        else:
            return self._objlist

    def __iter__(self):
        for x in self.result():
            yield x
            
    def map(self, f_lambda, args=None):
        """Run the lambda function on each of the elements of the batch and return the batch object.
        
        >>> iml = [vipy.image.RandomScene(512,512) for k in range(0,1000)]   
        >>> imb = vipy.image.Batch(iml) 
        >>> imb.map(lambda im: im.rgb())  

        The lambda function f_lambda should not include closures.  If it does, construct the lambda with default parameter capture:

        >>> f = lambda x, prm1=42: x+prm1

        instead of:

        >>> prm1 = 42
        >>> f = lambda x: x+prm1

        """
        c = self._client()

        if c is None:
            f_lambda_ordered = (lambda x,f=f_lambda: (x[0], f(x[1]))) if self._ordered else f_lambda
            self._objlist = [f_lambda_ordered(o) for o in self._objlist]  # no parallelism
        else:
            f_lambda_ordered = (lambda x,f=f_lambda: (x[0], f(x[1]))) if self._ordered else f_lambda
            objlist = c.scatter(self._objlist) if (self._minscatter is not None and len(self._objlist) >= self._minscatter) else self._objlist
            self._objlist = self._wait(c.map(f_lambda_ordered, objlist))
        return self

    def filter(self, f_lambda):
        """Run the lambda function on each of the elements of the batch and filter based on the provided lambda keeping those elements that return true 
        """
        c = self._client()
        f_lambda_ordered = (lambda x,f=f_lambda: (x[0], f(x[1]))) if self._ordered else f_lambda

        if c is None:
            self._objlist = [o for o in self._objlist if f_lambda_ordered(o)[1]]  # no parallelism
        else:
            objlist = self._objlist  # original list
            is_filtered = self._wait(c.map(f_lambda_ordered, c.scatter(self._objlist)))  # distributed filter (replaces self._objlist)
            self._objlist = [obj for (f, obj) in zip(is_filtered, objlist) if f[1] is True]  # keep only elements that filter true
        return self
        
    def scattermap(self, f_lambda, obj):
        """Scatter obj to all workers, and apply lambda function f(obj, im) to each element in batch
        
           Usage: 
         
           >>> Batch(mylist, ngpu=8).scattermap(lambda net, im: net(im), net).result()
        
           This will scatter the large object net to all workers, and pin it to a specific GPU.  Within the net object, you can call 
           vipy.global.gpuindex() to retrieve your assigned GPU index, which can be used by torch.cuda.device().  Then, the net
           object processes each element in the batch using net according to the lambda, and returns the results.  This function 
           includes ngpu processes, and assumes there are ngpu available on the target machine.  Each net is replicated in a different
           process, so it is the callers responsibility for getting vipy.global.gpuindex() from within the process and setting 
           net to take advantage of this GPU rather than using the default cuda:0.  

        """
        c = self._client()
        f_lambda_ordered = (lambda net,x,f=f_lambda: (x[0], f(net,x[1]))) if self._ordered else (lambda net,x,f=f_lambda: f(net, x))

        if c is None:
            self._objlist = [f_lambda_ordered(obj, o) for o in self._objlist]  # no parallelism
        else:
            objdist = c.scatter(obj, broadcast=True)        
            objlist = c.scatter(self._objlist) if (self._minscatter is not None and len(self._objlist) >= self._minscatter) else self._objlist
            self._objlist = self._wait([c.submit(f_lambda_ordered, objdist, im) for im in objlist])
        return self

Classes

class Batch (objlist, strict=False, as_completed=False, warnme=False, minscatter=None, ordered=False)

vipy.batch.Batch class

This class provides a representation of a set of vipy objects. All of the object types must be the same. If so, then an operation on the batch is performed on each of the elements in the batch in parallel.

Examples:

>>> b = vipy.batch.Batch([Image(filename='img_%06d.png' % k) for k in range(0,100)])
>>> b.map(lambda im: im.bgr())  
>>> b.map(lambda im: np.sum(im.array())) 
>>> b.map(lambda im, f: im.saveas(f), args=['out%d.jpg' % k for k in range(0,100)])
>>> v = vipy.video.RandomSceneActivity()
>>> b = vipy.batch.Batch(v, n_processes=16)
>>> b.map(lambda v,k: v[k], args=[(k,) for k in range(0, len(v))])  # paralle interpolation
>>> d = vipy.data.kinetics.Kinetics700('/path/to/kinetics').download().trainset()
>>> b = vipy.batch.Batch(d, n_processes=32)
>>> b.map(lambda v: v.download().save())  # will download and clip dataset in parallel
>>> b.result()  # retrieve results after a sequence of map or filter chains
>>> list(b)     # equivalent to b.result()

Args

strict
[bool] if distributed processing fails, return None for that element and print the exception rather than raise
as_completed
[bool] Return the objects to the scheduler as they complete, this can introduce instabilities for large complex objects, use with caution
ordered
[bool]: If True, then preserve the order of objects in objlist in distributed processing

Create a batch of homogeneous vipy.image objects from an iterable that can be operated on with a single parallel function call

Expand source code Browse git
class Batch():
    """vipy.batch.Batch class

    This class provides a representation of a set of vipy objects.  All of the object types must be the same.  If so, then an operation on the batch is performed on each of the elements in the batch in parallel.

    Examples:

    >>> b = vipy.batch.Batch([Image(filename='img_%06d.png' % k) for k in range(0,100)])
    >>> b.map(lambda im: im.bgr())  
    >>> b.map(lambda im: np.sum(im.array())) 
    >>> b.map(lambda im, f: im.saveas(f), args=['out%d.jpg' % k for k in range(0,100)])
    
    >>> v = vipy.video.RandomSceneActivity()
    >>> b = vipy.batch.Batch(v, n_processes=16)
    >>> b.map(lambda v,k: v[k], args=[(k,) for k in range(0, len(v))])  # paralle interpolation

    >>> d = vipy.data.kinetics.Kinetics700('/path/to/kinetics').download().trainset()
    >>> b = vipy.batch.Batch(d, n_processes=32)
    >>> b.map(lambda v: v.download().save())  # will download and clip dataset in parallel

    >>> b.result()  # retrieve results after a sequence of map or filter chains
    >>> list(b)     # equivalent to b.result()

    Args:
        strict: [bool] if distributed processing fails, return None for that element and print the exception rather than raise
        as_completed: [bool] Return the objects to the scheduler as they complete, this can introduce instabilities for large complex objects, use with caution
        ordered: [bool]: If True, then preserve the order of objects in objlist in distributed processing

    """    
             
    def __init__(self, objlist, strict=False, as_completed=False, warnme=False, minscatter=None, ordered=False):
        """Create a batch of homogeneous vipy.image objects from an iterable that can be operated on with a single parallel function call
        """
        assert isinstance(objlist, list), "Input must be a list"
        self._batchtype = type(objlist[0]) if len(objlist)>0 else type(None)
        assert all([isinstance(im, self._batchtype) for im in objlist]), "Invalid input - Must be homogeneous list of the same type"                

        # Move this into map and disable using localmap
        if vipy.globals.dask() is None and warnme:
            print('[vipy.batch.Batch]: vipy.batch.Batch() is not set to use parallelism.  This is set using:\n    >>> with vipy.globals.parallel(n) for multi-processing with n processes\n    >>> vipy.globals.parallel(pct=0.8) for multiprocessing that uses a percentage of the current system resources\n    >>> vipy.globals.dask(address="SCHEDULER:PORT") which connects to a Dask distributed scheduler.\n    >>> vipy.globals.noparallel() to completely disable all parallelism.')

        # FIXME: this needs to go into Dask()
        #self._ngpu = ngpu
        #if ngpu is not None:
        #    assert isinstance(ngpu, int) and ngpu > 0, "Number of GPUs must be >= 0 not '%s'" % (str(ngpu))
        #    wait([self._client.submit(vipy.globals.gpuindex, k, workers=wid) for (k, wid) in enumerate(self._client.scheduler_info()['workers'].keys())])

        self._strict = strict
        self._as_completed = as_completed  # this may introduce instabilities for large complex objects, use with caution
        self._minscatter = minscatter
        self._ordered = ordered
        self._objlist = [(k,o) for (k,o) in enumerate(objlist)] if ordered else objlist

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        return self
    
    def __len__(self):
        return len(self._objlist)

    def __repr__(self):        
        return str('<vipy.batch: type=%s, len=%d%s>' % (str(self._batchtype), len(self), (', client=%s' % str(vipy.globals.dask())) if vipy.globals.dask() is not None else ''))

    def _client(self):
        return vipy.globals.dask()._client if vipy.globals.dask() is not None else None

    def _batch_wait(self, futures):
        try:
            results = []            
            for (k, batch) in enumerate(as_completed(futures, with_results=True, raise_errors=False).batches()):
                for (future, result) in batch:
                    if future.status != 'error':
                        results.append(result)  # not order preserving, will restore order in result()
                    else:
                        if self._strict:
                            typ, exc, tb = result
                            raise exc.with_traceback(tb)
                        else:
                            print('[vipy.batch]: future %s failed with error "%s" - SKIPPING' % (str(future), str(result)))
                        results.append(None)
                    del future, result  # distributed memory cleanup

                # Distributed memory cleanup
                del batch
  
            return results

        except KeyboardInterrupt:
            # warnings.warn('[vipy.batch]: batch cannot be restarted after killing with ctrl-c - You must create a new Batch()')
            #vipy.globals.dask().shutdown()
            #self._client = None
            return results
        except:
            # warnings.warn('[vipy.batch]: batch cannot be restarted after exception - Recreate Batch()')                
            raise
    
    def _wait(self, futures):
        assert islist(futures) and all([hasattr(f, 'result') for f in futures])
        if self._as_completed:
            return self._batch_wait(futures)        
        try:
            results = []            
            wait(futures)
            for f in futures:  
                try:
                    results.append(f.result()) 
                except:
                    if self._strict:
                        raise
                    try:
                        print('[vipy.batch]: future %s failed with error "%s" for batch "%s"' % (str(f), str(f.exception()), str(self)))
                    except:
                        print('[vipy.batch]: future failed')
                    results.append(None)
            return results

        except KeyboardInterrupt:
            # warnings.warn('[vipy.batch]: batch cannot be restarted after killing with ctrl-c - You must create a new Batch()')
            #vipy.globals.dask().shutdown()
            #self._client = None
            return None  
        except:
            # warnings.warn('[vipy.batch]: batch cannot be restarted after exception - Recreate Batch()')                
            raise

    def result(self):
        """Return the result of the batch processing, ordered"""
        if self._ordered:
            objlist = {int(v[0]):v[1] for v in self._objlist if v is not None}
            return [objlist[k] if k in objlist else None for k in range(len(self._objlist))]  # restore order
        else:
            return self._objlist

    def __iter__(self):
        for x in self.result():
            yield x
            
    def map(self, f_lambda, args=None):
        """Run the lambda function on each of the elements of the batch and return the batch object.
        
        >>> iml = [vipy.image.RandomScene(512,512) for k in range(0,1000)]   
        >>> imb = vipy.image.Batch(iml) 
        >>> imb.map(lambda im: im.rgb())  

        The lambda function f_lambda should not include closures.  If it does, construct the lambda with default parameter capture:

        >>> f = lambda x, prm1=42: x+prm1

        instead of:

        >>> prm1 = 42
        >>> f = lambda x: x+prm1

        """
        c = self._client()

        if c is None:
            f_lambda_ordered = (lambda x,f=f_lambda: (x[0], f(x[1]))) if self._ordered else f_lambda
            self._objlist = [f_lambda_ordered(o) for o in self._objlist]  # no parallelism
        else:
            f_lambda_ordered = (lambda x,f=f_lambda: (x[0], f(x[1]))) if self._ordered else f_lambda
            objlist = c.scatter(self._objlist) if (self._minscatter is not None and len(self._objlist) >= self._minscatter) else self._objlist
            self._objlist = self._wait(c.map(f_lambda_ordered, objlist))
        return self

    def filter(self, f_lambda):
        """Run the lambda function on each of the elements of the batch and filter based on the provided lambda keeping those elements that return true 
        """
        c = self._client()
        f_lambda_ordered = (lambda x,f=f_lambda: (x[0], f(x[1]))) if self._ordered else f_lambda

        if c is None:
            self._objlist = [o for o in self._objlist if f_lambda_ordered(o)[1]]  # no parallelism
        else:
            objlist = self._objlist  # original list
            is_filtered = self._wait(c.map(f_lambda_ordered, c.scatter(self._objlist)))  # distributed filter (replaces self._objlist)
            self._objlist = [obj for (f, obj) in zip(is_filtered, objlist) if f[1] is True]  # keep only elements that filter true
        return self
        
    def scattermap(self, f_lambda, obj):
        """Scatter obj to all workers, and apply lambda function f(obj, im) to each element in batch
        
           Usage: 
         
           >>> Batch(mylist, ngpu=8).scattermap(lambda net, im: net(im), net).result()
        
           This will scatter the large object net to all workers, and pin it to a specific GPU.  Within the net object, you can call 
           vipy.global.gpuindex() to retrieve your assigned GPU index, which can be used by torch.cuda.device().  Then, the net
           object processes each element in the batch using net according to the lambda, and returns the results.  This function 
           includes ngpu processes, and assumes there are ngpu available on the target machine.  Each net is replicated in a different
           process, so it is the callers responsibility for getting vipy.global.gpuindex() from within the process and setting 
           net to take advantage of this GPU rather than using the default cuda:0.  

        """
        c = self._client()
        f_lambda_ordered = (lambda net,x,f=f_lambda: (x[0], f(net,x[1]))) if self._ordered else (lambda net,x,f=f_lambda: f(net, x))

        if c is None:
            self._objlist = [f_lambda_ordered(obj, o) for o in self._objlist]  # no parallelism
        else:
            objdist = c.scatter(obj, broadcast=True)        
            objlist = c.scatter(self._objlist) if (self._minscatter is not None and len(self._objlist) >= self._minscatter) else self._objlist
            self._objlist = self._wait([c.submit(f_lambda_ordered, objdist, im) for im in objlist])
        return self

Methods

def filter(self, f_lambda)

Run the lambda function on each of the elements of the batch and filter based on the provided lambda keeping those elements that return true

Expand source code Browse git
def filter(self, f_lambda):
    """Run the lambda function on each of the elements of the batch and filter based on the provided lambda keeping those elements that return true 
    """
    c = self._client()
    f_lambda_ordered = (lambda x,f=f_lambda: (x[0], f(x[1]))) if self._ordered else f_lambda

    if c is None:
        self._objlist = [o for o in self._objlist if f_lambda_ordered(o)[1]]  # no parallelism
    else:
        objlist = self._objlist  # original list
        is_filtered = self._wait(c.map(f_lambda_ordered, c.scatter(self._objlist)))  # distributed filter (replaces self._objlist)
        self._objlist = [obj for (f, obj) in zip(is_filtered, objlist) if f[1] is True]  # keep only elements that filter true
    return self
def map(self, f_lambda, args=None)

Run the lambda function on each of the elements of the batch and return the batch object.

>>> iml = [vipy.image.RandomScene(512,512) for k in range(0,1000)]   
>>> imb = vipy.image.Batch(iml) 
>>> imb.map(lambda im: im.rgb())  

The lambda function f_lambda should not include closures. If it does, construct the lambda with default parameter capture:

>>> f = lambda x, prm1=42: x+prm1

instead of:

>>> prm1 = 42
>>> f = lambda x: x+prm1
Expand source code Browse git
def map(self, f_lambda, args=None):
    """Run the lambda function on each of the elements of the batch and return the batch object.
    
    >>> iml = [vipy.image.RandomScene(512,512) for k in range(0,1000)]   
    >>> imb = vipy.image.Batch(iml) 
    >>> imb.map(lambda im: im.rgb())  

    The lambda function f_lambda should not include closures.  If it does, construct the lambda with default parameter capture:

    >>> f = lambda x, prm1=42: x+prm1

    instead of:

    >>> prm1 = 42
    >>> f = lambda x: x+prm1

    """
    c = self._client()

    if c is None:
        f_lambda_ordered = (lambda x,f=f_lambda: (x[0], f(x[1]))) if self._ordered else f_lambda
        self._objlist = [f_lambda_ordered(o) for o in self._objlist]  # no parallelism
    else:
        f_lambda_ordered = (lambda x,f=f_lambda: (x[0], f(x[1]))) if self._ordered else f_lambda
        objlist = c.scatter(self._objlist) if (self._minscatter is not None and len(self._objlist) >= self._minscatter) else self._objlist
        self._objlist = self._wait(c.map(f_lambda_ordered, objlist))
    return self
def result(self)

Return the result of the batch processing, ordered

Expand source code Browse git
def result(self):
    """Return the result of the batch processing, ordered"""
    if self._ordered:
        objlist = {int(v[0]):v[1] for v in self._objlist if v is not None}
        return [objlist[k] if k in objlist else None for k in range(len(self._objlist))]  # restore order
    else:
        return self._objlist
def scattermap(self, f_lambda, obj)

Scatter obj to all workers, and apply lambda function f(obj, im) to each element in batch

Usage:

>>> Batch(mylist, ngpu=8).scattermap(lambda net, im: net(im), net).result()

This will scatter the large object net to all workers, and pin it to a specific GPU. Within the net object, you can call vipy.global.gpuindex() to retrieve your assigned GPU index, which can be used by torch.cuda.device(). Then, the net object processes each element in the batch using net according to the lambda, and returns the results. This function includes ngpu processes, and assumes there are ngpu available on the target machine. Each net is replicated in a different process, so it is the callers responsibility for getting vipy.global.gpuindex() from within the process and setting net to take advantage of this GPU rather than using the default cuda:0.

Expand source code Browse git
def scattermap(self, f_lambda, obj):
    """Scatter obj to all workers, and apply lambda function f(obj, im) to each element in batch
    
       Usage: 
     
       >>> Batch(mylist, ngpu=8).scattermap(lambda net, im: net(im), net).result()
    
       This will scatter the large object net to all workers, and pin it to a specific GPU.  Within the net object, you can call 
       vipy.global.gpuindex() to retrieve your assigned GPU index, which can be used by torch.cuda.device().  Then, the net
       object processes each element in the batch using net according to the lambda, and returns the results.  This function 
       includes ngpu processes, and assumes there are ngpu available on the target machine.  Each net is replicated in a different
       process, so it is the callers responsibility for getting vipy.global.gpuindex() from within the process and setting 
       net to take advantage of this GPU rather than using the default cuda:0.  

    """
    c = self._client()
    f_lambda_ordered = (lambda net,x,f=f_lambda: (x[0], f(net,x[1]))) if self._ordered else (lambda net,x,f=f_lambda: f(net, x))

    if c is None:
        self._objlist = [f_lambda_ordered(obj, o) for o in self._objlist]  # no parallelism
    else:
        objdist = c.scatter(obj, broadcast=True)        
        objlist = c.scatter(self._objlist) if (self._minscatter is not None and len(self._objlist) >= self._minscatter) else self._objlist
        self._objlist = self._wait([c.submit(f_lambda_ordered, objdist, im) for im in objlist])
    return self
class Dask (num_processes=None, dashboard=False, verbose=False, address=None, num_gpus=None)

Dask distributed client

Expand source code Browse git
class Dask(object):
    """Dask distributed client"""

    def __init__(self, num_processes=None, dashboard=False, verbose=False, address=None, num_gpus=None):
        assert address is not None or num_processes is not None or num_gpus is not None, "Invalid input"

        if num_gpus is not None:
            assert num_processes is None, "Cannot specify both num_gpus and num_processes"
            num_processes = num_gpus   # coercing

        self._num_processes = num_processes

        os.environ['DASK_LOGGING__DISTRIBUTED'] = 'warning' if not verbose else 'info'
        os.environ['DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT'] = "30s"
        os.environ['DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP'] = "30s"
        os.environ['DASK_DISTRIBUTED__DEPLOY__LOST_WORKER_TIMEOUT'] = "30s"

        dask.config.set({'DISTRIBUTED.COMM.TIMEOUTS.CONNECT'.lower():os.environ['DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT']})
        dask.config.set({'DISTRIBUTED.COMM.TIMEOUTS.TCP'.lower():os.environ['DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP']})
        dask.config.set({'DISTRIBUTED.DEPLOY.LOST_WORKER_TIMEOUT'.lower():os.environ['DASK_DISTRIBUTED__DEPLOY__LOST_WORKER_TIMEOUT']})
        dask.config.refresh()

        # Worker env
        env = {'VIPY_BACKEND':'Agg',  # headless in workers
               'PYTHONOPATH':os.environ['PYTHONPATH'] if 'PYTHONPATH' in os.environ else '',
               'PATH':os.environ['PATH'] if 'PATH' in os.environ else ''}

        if 'VIPY_CACHE' in os.environ:
            env.update({'VIPY_CACHE':os.environ['VIPY_CACHE']})
        if 'VIPY_AWS_ACCESS_KEY_ID' in os.environ:
            env.update({'VIPY_AWS_ACCESS_KEY_ID':os.environ['VIPY_AWS_ACCESS_KEY_ID']})            
        if  'VIPY_AWS_SECRET_ACCESS_KEY' in os.environ:
            env.update({'VIPY_AWS_SECRET_ACCESS_KEY':os.environ['VIPY_AWS_SECRET_ACCESS_KEY']})        
                    
        for (k,v) in os.environ.items():
            if k.startswith('DASK_'):
                env[k] = v
    
        if address is not None:
            # Distributed scheduler
            self._client = Client(name='vipy', address=address)

            # Update key environment variables on remote workers using out of band function (yuck)
            # Make sure that any environment variables are accessible on all machines!  (e.g. VIPY_CACHE)
            # If not, then you need to unset these variables from os.environ prior to calling Dask()
            def _f_setenv_remote(localenv):
                import os; os.environ.update(localenv)

            localenv = {k:v for (k,v) in os.environ.items() if k.startswith('VIPY_')}
            localenv.update( {'VIPY_BACKEND':'Agg'} )
            self._client.run(lambda env=localenv: _f_setenv_remote(env))

        else:
            # Local scheduler
            self._client = Client(name='vipy',
                                  address=address,  # to connect to distributed scheduler HOSTNAME:PORT
                                  scheduler_port=0,   # random
                                  dashboard_address=None if not dashboard else ':0',  # random port
                                  processes=True, 
                                  threads_per_worker=1,
                                  n_workers=num_processes, 
                                  env=env,
                                  direct_to_workers=True, 
                                  #memory_limit='auto',
                                  #silence_logs=20 if verbose else 40, 
                                  local_directory=tempfile.mkdtemp())

        self._num_gpus = num_gpus
        if self._num_gpus is not None:
            assert isinstance(self._num_gpus, int) and self._num_gpus > 0, "Number of GPUs must be >= 0 not '%s'" % (str(self._num_gpus))
            assert self._num_gpus == self._num_processes
            wait([self._client.submit(vipy.globals.gpuindex, k, workers=wid) for (k, wid) in enumerate(self._client.scheduler_info()['workers'].keys())])


    def __repr__(self):
        if self._num_processes is not None or self._num_gpus is not None:
            # Local 
            return str('<vipy.globals.dask: %s%s>' % ('gpus=%d' % self.num_gpus() if self.num_gpus() is not None else 'processes=%d' % self.num_processes(), ', dashboard="%s"' % str(self._client.dashboard_link) if self.has_dashboard() else ''))
        elif self._client is not None:
            # Distributed
            return str('<vipy.globals.dask: %s>' % (str(self._client)))
        else:
            return str('<vipy.globals.dask: shutdown')

    def num_gpus(self):
        return self._num_gpus

    def has_dashboard(self):
        return len(self._client.dashboard_link) > 0 if self._client is not None else False

    def dashboard(self):        
        webbrowser.open(self._client.dashboard_link) if len(self._client.dashboard_link)>0 else None
    
    def num_processes(self):
        return len(self._client.nthreads()) if self._client is not None else 0

    def shutdown(self):
        self._client.close()
        self._num_processes = None
        self._num_gpus = None
        vipy.globals.GLOBAL['DASK_CLIENT'] = None        
        return self

    def client(self):
        return self._client

Methods

def client(self)
Expand source code Browse git
def client(self):
    return self._client
def dashboard(self)
Expand source code Browse git
def dashboard(self):        
    webbrowser.open(self._client.dashboard_link) if len(self._client.dashboard_link)>0 else None
def has_dashboard(self)
Expand source code Browse git
def has_dashboard(self):
    return len(self._client.dashboard_link) > 0 if self._client is not None else False
def num_gpus(self)
Expand source code Browse git
def num_gpus(self):
    return self._num_gpus
def num_processes(self)
Expand source code Browse git
def num_processes(self):
    return len(self._client.nthreads()) if self._client is not None else 0
def shutdown(self)
Expand source code Browse git
def shutdown(self):
    self._client.close()
    self._num_processes = None
    self._num_gpus = None
    vipy.globals.GLOBAL['DASK_CLIENT'] = None        
    return self