Module vipy.globals

Expand source code Browse git
import os
import webbrowser
import tempfile
import builtins
import logging 
import concurrent.futures 


# Global mutable dictionary
GLOBAL = {'DASK_CLIENT': None,   # Global Dask() client for distributed processing
          'CONCURRENT_FUTURES':None,  # global futures client 
          'CACHE':os.environ['VIPY_CACHE'] if 'VIPY_CACHE' in os.environ else None,   # Cache directory for vipy.video and vipy.image donwloads
          'LOGGER':logging.getLogger('vipy'),     # The global logger
          'DEBUG':False, # globally enable debugging flags
          'GUI':{'escape':False},
          'AWS':{'AWS_ACCESS_KEY_ID':os.environ['VIPY_AWS_ACCESS_KEY_ID'] if 'VIPY_AWS_ACCESS_KEY_ID' in os.environ else None,
                 'AWS_SECRET_ACCESS_KEY':os.environ['VIPY_AWS_SECRET_ACCESS_KEY'] if 'VIPY_AWS_SECRET_ACCESS_KEY' in os.environ else None,
                 'AWS_SESSION_TOKEN':os.environ['VIPY_AWS_SESSION_TOKEN'] if 'VIPY_AWS_SESSION_TOKEN' in os.environ else None},
          'LATEX':os.environ['VIPY_LATEX'] if 'VIPY_LATEX' in os.environ else None}

log = GLOBAL['LOGGER']

def logger():
    return GLOBALS['LOGGER']


class Dask(object):
    """Dask distributed client"""
    
    def __init__(self, num_workers=None, threaded=True, dashboard=False, verbose=False, address=None):
        from vipy.util import try_import
        try_import('dask', 'dask distributed'); import dask, dask.distributed;
    
        assert address is not None or num_workers is not None, "Invalid input"

        self._num_workers = num_workers
        self._has_dashboard = dashboard
        
        # Dask configuration: https://docs.dask.org/en/latest/configuration.html
        # - when using vipy.dataset.Dataset minibatch iterator, large minibatches can result in a warning about large graphs
        # - The end user can set these environemnt variables, and will only be overwritten with defaults here if not provided
        if 'DASK_LOGGING__DISTRIBUTED' not in os.environ:
            os.environ['DASK_LOGGING__DISTRIBUTED'] = 'warning' if not verbose else 'info'
        if 'DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT' not in os.environ:
            os.environ['DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT'] = "30s"
        if 'DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP' not in os.environ:
            os.environ['DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP'] = "30s"
        if 'DASK_DISTRIBUTED__DEPLOY__LOST_WORKER_TIMEOUT' not in os.environ:
            os.environ['DASK_DISTRIBUTED__DEPLOY__LOST_WORKER_TIMEOUT'] = "30s"
        if 'DASK_DISTRIBUTED__COMM__RETRY__COUNT' not in os.environ:
            os.environ['DASK_DISTRIBUTED__COMM__RETRY__COUNT'] = "10"        
        if 'DASK_ADMIN_LARGE_GRAPH_WARNING_THREHSOLD' not in os.environ:
            os.environ['DASK_ADMIN_LARGE_GRAPH_WARNING_THREHSOLD'] = "50MB"        

        dask.config.refresh()
        
        dask.config.set({'DISTRIBUTED.COMM.RETRY.COUNT'.lower():int(os.environ['DASK_DISTRIBUTED__COMM__RETRY__COUNT'])})
        dask.config.set({'DISTRIBUTED.COMM.TIMEOUTS.CONNECT'.lower():os.environ['DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT']})
        dask.config.set({'DISTRIBUTED.COMM.TIMEOUTS.TCP'.lower():os.environ['DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP']})
        dask.config.set({'DISTRIBUTED.DEPLOY.LOST_WORKER_TIMEOUT'.lower():os.environ['DASK_DISTRIBUTED__DEPLOY__LOST_WORKER_TIMEOUT']})        
        dask.config.set({"distributed.admin.large-graph-warning-threshold": os.environ['DASK_ADMIN_LARGE_GRAPH_WARNING_THREHSOLD']})
        
        
        # Worker env
        env = {'VIPY_BACKEND':'Agg',  # headless in workers
               'PYTHONOPATH':os.environ['PYTHONPATH'] if 'PYTHONPATH' in os.environ else '',
               'PATH':os.environ['PATH'] if 'PATH' in os.environ else ''}

        if 'VIPY_CACHE' in os.environ:
            env.update({'VIPY_CACHE':os.environ['VIPY_CACHE']})
        if 'VIPY_AWS_ACCESS_KEY_ID' in os.environ:
            env.update({'VIPY_AWS_ACCESS_KEY_ID':os.environ['VIPY_AWS_ACCESS_KEY_ID']})            
        if 'VIPY_AWS_SECRET_ACCESS_KEY' in os.environ:
            env.update({'VIPY_AWS_SECRET_ACCESS_KEY':os.environ['VIPY_AWS_SECRET_ACCESS_KEY']})        
                    
        for (k,v) in os.environ.items():
            if k.startswith('DASK_'):
                env[k] = v
    
        if address is not None:
            # Distributed scheduler
            self._client = dask.distributed.Client(name='vipy', address=address)

            # Update key environment variables on remote workers using out of band function (yuck)
            # Make sure that any environment variables are accessible on all machines!  (e.g. VIPY_CACHE)
            # If not, then you need to unset these variables from os.environ prior to calling Dask()
            def _f_setenv_remote(localenv):
                import os; os.environ.update(localenv)

            localenv = {k:v for (k,v) in os.environ.items() if k.startswith('VIPY_')}
            localenv.update( {'VIPY_BACKEND':'Agg'} )
            self._client.run(lambda env=localenv: _f_setenv_remote(env))

        else:
            kwargs = {'name':'vipy',
                      'address':address,  # to connect to distributed scheduler HOSTNAME:PORT
                      'scheduler_port':0,   # random
                      'dashboard_address':None if not dashboard else ':0',  # random port
                      'processes':not threaded,
                      'threads_per_worker':1,
                      'n_workers':num_workers,
                      'local_directory':tempfile.mkdtemp()}
            kwargs.update({'env':env, 'direct_to_workers':True} if not threaded else {})
            
            # Local scheduler
            self._client = dask.distributed.Client(**kwargs)
            

    def __repr__(self):
        if self._num_workers is not None:
            # Local 
            return str('<vipy.globals.Dask: %s%s>' % ('workers=%d' % self.num_workers(), ', dashboard=%s' % str(self._client.dashboard_link) if self._has_dashboard else ''))
        elif self._client is not None:
            # Distributed
            return str('<vipy.globals.Dask: %s>' % (str(self._client)))
        else:
            return str('<vipy.globals.Dask: shutdown')

    def num_workers(self):
        return self._num_workers

    def shutdown(self):
        self._client.close()
        self._num_workers = None
        GLOBAL['DASK_CLIENT'] = None        
        return self

    def client(self):
        return self._client



def cache(cachedir=None):
    """The cache is the location that URLs are downloaded to on your system.  This can be set here, or with the environment variable VIPY_CACHE

    >>> vipy.globals.cache('/path/to/.vipy')
    >>> cachedir = vipy.globals.cache()

    Args:
        cachedir:  the location to store cached files when downloaded.  Can also be set using the VIPY_CACHE environment variable.  if none, return the current cachedir
    
    Returns:
        The current cachedir if cachedir=None else None
    
    """
    if cachedir is not None:
        from vipy.util import remkdir        
        os.environ['VIPY_CACHE'] = remkdir(cachedir)
        GLOBAL['CACHE'] = cachedir
    return os.environ['VIPY_CACHE'] if 'VIPY_CACHE' in os.environ else None
    

def _user_hit_escape(b=None):
    """Did the user hit the escape key?  Useful for matplotlib GUI to stop displaying video"""
    if b is None:
        if GLOBAL['GUI']['escape']:
            GLOBAL['GUI']['escape'] = False  # toggle it
            return True
        else:
            return False
    else:
        # Set in vipy.gui.using_matplotlib.escape_to_exit()
        assert isinstance(b, bool)
        GLOBAL['GUI']['escape'] = b  

        
def cf(num_workers=None, threaded=True):
    if num_workers is not None:
        if GLOBAL['CONCURRENT_FUTURES']:
            GLOBAL['CONCURRENT_FUTURES'].shutdown()
            
        GLOBAL['CONCURRENT_FUTURES'] = (concurrent.futures.ThreadPoolExecutor(max_workers=num_workers, thread_name_prefix='vipy') if threaded else
                                        concurrent.futures.ProcessPoolExecutor(max_workers=num_workers))
    return GLOBAL['CONCURRENT_FUTURES']
                                                 
    
def dask(num_workers=None, dashboard=False, address=None, pct=None, threaded=True):
    """Return the current Dask client, can be accessed globally for parallel processing.
    
    Args:
        pct: float in [0,1] the percentage of the current machine to use
        address:  the dask scheduler of the form 'HOSTNAME:PORT'
        num_workers:  the number of prpcesses to use on the current machine
        dashboard: [bool] whether to inialize the dask client with a web dashboard
        threaded: [bool] if true, create threaded workers intead of processes

    Returns:
        The `vipy.batch.Dask` object pointing to the Dask Distrbuted object
    """
    if pct is not None:
        assert pct > 0 and pct <= 1
        num_workers = int(pct*os.cpu_count())
    if address is not None or num_workers is not None:
        if GLOBAL['DASK_CLIENT']:
            GLOBAL['DASK_CLIENT'].shutdown()
        GLOBAL['DASK_CLIENT'] = Dask(num_workers, threaded=threaded, dashboard=dashboard, verbose=False, address=address)        
    return GLOBAL['DASK_CLIENT']


def parallel(workers=None, pct=None, threaded=True):
    """Enable parallel processing with n>=1 processes or a percentage of system core (pct in [0,1])  .

    This can be be used as a context manager
    
    >>> with vipy.globals.parallel(n=4):
    >>>     vipy.batch.Batch(...)

    or using the global variables:

    >>> vipy.globals.parallel(n=4):
    >>> vipy.batch.Batch(...)
    >>> vipy.globals.noparallel()
    
    To check the current parallelism level:
    
    >>> num_workers = vipy.globals.parallel().num_workers()

    To run with a dask scheduler:
    
    >>> with vipy.globals.parallel(scheduler='10.0.1.1:8585')
    >>>    vipy.batch.Batch(...)

    Args:
        workers: [int] number of parallel workers
        pct: [float] the percentage [0,1] of system cores to dedicate to parallel processing
        threaded [bool]: if false, use processes (not recommended, since vipy parallel processing usually releases the GIL)
    """

    class Parallel():
        def __init__(self, workers):
            self._workers = workers
            self._threaded = threaded
            self.start()
            
        def __enter__(self):
            pass
            
        def __exit__(self, *args):            
            self.shutdown() 
            
        def __repr__(self):
            return '<vipy.globals.parallel: workers=%d, cf=%s>' % (self.num_workers(), GLOBAL['CONCURRENT_FUTURES'] if GLOBAL['CONCURRENT_FUTURES'] else 'stopped')

        def start(self):
            if not GLOBAL['CONCURRENT_FUTURES'] and self._workers>0:                
                cf(num_workers=self._workers, threaded=self._threaded)
            GLOBAL['LOGGER'].info('Parallel executor initialized %s' % self)
            return self
        
        def shutdown(self):
            if GLOBAL['CONCURRENT_FUTURES']:
                GLOBAL['LOGGER'].info('Parallel executor shutdown %s' % self)                            
                GLOBAL['CONCURRENT_FUTURES'].shutdown(wait=True)
            GLOBAL['CONCURRENT_FUTURES'] = None
        
        def num_workers(self):
            return self._workers

    return Parallel(workers if not pct else int(pct*os.cpu_count()))

def multithreading(n=None, pct=None):
    """Context manager for concurrent futures multithreaded executor, use with `vipy.dataset.Dataset`"""
    return parallel(workers=n, pct=pct, threaded=True)

def multiprocessing(n=None, pct=None):
    """Context manager for concurrent futures multiprocessing executor, use with `vipy.dataset.Dataset`"""    
    return parallel(workers=n, pct=pct, threaded=False)

def noparallel():
    """Disable all parallel processing"""
    if GLOBAL['DASK_CLIENT'] is not None:
        GLOBAL['DASK_CLIENT'].shutdown()
        del GLOBAL['DASK_CLIENT']
        GLOBAL['LOGGER'].info('Parallel executor shutdown')
    if GLOBAL['CONCURRENT_FUTURES']:
        GLOBAL['CONCURRENT_FUTURES'].shutdown()
        
    GLOBAL['CONCURRENT_FUTURES'] = None
    GLOBAL['DASK_CLIENT'] = None 
    
    
def shutdown():
    """Alias for `vipy.globals.noparallel`"""    
    return noparallel()

Functions

def cache(cachedir=None)

The cache is the location that URLs are downloaded to on your system. This can be set here, or with the environment variable VIPY_CACHE

>>> vipy.globals.cache('/path/to/.vipy')
>>> cachedir = vipy.globals.cache()

Args

cachedir
the location to store cached files when downloaded. Can also be set using the VIPY_CACHE environment variable. if none, return the current cachedir

Returns

The current cachedir if cachedir=None else None

Expand source code Browse git
def cache(cachedir=None):
    """The cache is the location that URLs are downloaded to on your system.  This can be set here, or with the environment variable VIPY_CACHE

    >>> vipy.globals.cache('/path/to/.vipy')
    >>> cachedir = vipy.globals.cache()

    Args:
        cachedir:  the location to store cached files when downloaded.  Can also be set using the VIPY_CACHE environment variable.  if none, return the current cachedir
    
    Returns:
        The current cachedir if cachedir=None else None
    
    """
    if cachedir is not None:
        from vipy.util import remkdir        
        os.environ['VIPY_CACHE'] = remkdir(cachedir)
        GLOBAL['CACHE'] = cachedir
    return os.environ['VIPY_CACHE'] if 'VIPY_CACHE' in os.environ else None
def cf(num_workers=None, threaded=True)
Expand source code Browse git
def cf(num_workers=None, threaded=True):
    if num_workers is not None:
        if GLOBAL['CONCURRENT_FUTURES']:
            GLOBAL['CONCURRENT_FUTURES'].shutdown()
            
        GLOBAL['CONCURRENT_FUTURES'] = (concurrent.futures.ThreadPoolExecutor(max_workers=num_workers, thread_name_prefix='vipy') if threaded else
                                        concurrent.futures.ProcessPoolExecutor(max_workers=num_workers))
    return GLOBAL['CONCURRENT_FUTURES']
def dask(num_workers=None, dashboard=False, address=None, pct=None, threaded=True)

Return the current Dask client, can be accessed globally for parallel processing.

Args

pct
float in [0,1] the percentage of the current machine to use
address
the dask scheduler of the form 'HOSTNAME:PORT'
num_workers
the number of prpcesses to use on the current machine
dashboard
[bool] whether to inialize the dask client with a web dashboard
threaded
[bool] if true, create threaded workers intead of processes

Returns

The vipy.batch.Dask object pointing to the Dask Distrbuted object

Expand source code Browse git
def dask(num_workers=None, dashboard=False, address=None, pct=None, threaded=True):
    """Return the current Dask client, can be accessed globally for parallel processing.
    
    Args:
        pct: float in [0,1] the percentage of the current machine to use
        address:  the dask scheduler of the form 'HOSTNAME:PORT'
        num_workers:  the number of prpcesses to use on the current machine
        dashboard: [bool] whether to inialize the dask client with a web dashboard
        threaded: [bool] if true, create threaded workers intead of processes

    Returns:
        The `vipy.batch.Dask` object pointing to the Dask Distrbuted object
    """
    if pct is not None:
        assert pct > 0 and pct <= 1
        num_workers = int(pct*os.cpu_count())
    if address is not None or num_workers is not None:
        if GLOBAL['DASK_CLIENT']:
            GLOBAL['DASK_CLIENT'].shutdown()
        GLOBAL['DASK_CLIENT'] = Dask(num_workers, threaded=threaded, dashboard=dashboard, verbose=False, address=address)        
    return GLOBAL['DASK_CLIENT']
def logger()
Expand source code Browse git
def logger():
    return GLOBALS['LOGGER']
def multiprocessing(n=None, pct=None)

Context manager for concurrent futures multiprocessing executor, use with Dataset

Expand source code Browse git
def multiprocessing(n=None, pct=None):
    """Context manager for concurrent futures multiprocessing executor, use with `vipy.dataset.Dataset`"""    
    return parallel(workers=n, pct=pct, threaded=False)
def multithreading(n=None, pct=None)

Context manager for concurrent futures multithreaded executor, use with Dataset

Expand source code Browse git
def multithreading(n=None, pct=None):
    """Context manager for concurrent futures multithreaded executor, use with `vipy.dataset.Dataset`"""
    return parallel(workers=n, pct=pct, threaded=True)
def noparallel()

Disable all parallel processing

Expand source code Browse git
def noparallel():
    """Disable all parallel processing"""
    if GLOBAL['DASK_CLIENT'] is not None:
        GLOBAL['DASK_CLIENT'].shutdown()
        del GLOBAL['DASK_CLIENT']
        GLOBAL['LOGGER'].info('Parallel executor shutdown')
    if GLOBAL['CONCURRENT_FUTURES']:
        GLOBAL['CONCURRENT_FUTURES'].shutdown()
        
    GLOBAL['CONCURRENT_FUTURES'] = None
    GLOBAL['DASK_CLIENT'] = None 
def parallel(workers=None, pct=None, threaded=True)

Enable parallel processing with n>=1 processes or a percentage of system core (pct in [0,1]) .

This can be be used as a context manager

>>> with vipy.globals.parallel(n=4):
>>>     vipy.batch.Batch(...)

or using the global variables:

>>> vipy.globals.parallel(n=4):
>>> vipy.batch.Batch(...)
>>> vipy.globals.noparallel()

To check the current parallelism level:

>>> num_workers = vipy.globals.parallel().num_workers()

To run with a dask scheduler:

>>> with vipy.globals.parallel(scheduler='10.0.1.1:8585')
>>>    vipy.batch.Batch(...)

Args

workers
[int] number of parallel workers
pct
[float] the percentage [0,1] of system cores to dedicate to parallel processing

threaded [bool]: if false, use processes (not recommended, since vipy parallel processing usually releases the GIL)

Expand source code Browse git
def parallel(workers=None, pct=None, threaded=True):
    """Enable parallel processing with n>=1 processes or a percentage of system core (pct in [0,1])  .

    This can be be used as a context manager
    
    >>> with vipy.globals.parallel(n=4):
    >>>     vipy.batch.Batch(...)

    or using the global variables:

    >>> vipy.globals.parallel(n=4):
    >>> vipy.batch.Batch(...)
    >>> vipy.globals.noparallel()
    
    To check the current parallelism level:
    
    >>> num_workers = vipy.globals.parallel().num_workers()

    To run with a dask scheduler:
    
    >>> with vipy.globals.parallel(scheduler='10.0.1.1:8585')
    >>>    vipy.batch.Batch(...)

    Args:
        workers: [int] number of parallel workers
        pct: [float] the percentage [0,1] of system cores to dedicate to parallel processing
        threaded [bool]: if false, use processes (not recommended, since vipy parallel processing usually releases the GIL)
    """

    class Parallel():
        def __init__(self, workers):
            self._workers = workers
            self._threaded = threaded
            self.start()
            
        def __enter__(self):
            pass
            
        def __exit__(self, *args):            
            self.shutdown() 
            
        def __repr__(self):
            return '<vipy.globals.parallel: workers=%d, cf=%s>' % (self.num_workers(), GLOBAL['CONCURRENT_FUTURES'] if GLOBAL['CONCURRENT_FUTURES'] else 'stopped')

        def start(self):
            if not GLOBAL['CONCURRENT_FUTURES'] and self._workers>0:                
                cf(num_workers=self._workers, threaded=self._threaded)
            GLOBAL['LOGGER'].info('Parallel executor initialized %s' % self)
            return self
        
        def shutdown(self):
            if GLOBAL['CONCURRENT_FUTURES']:
                GLOBAL['LOGGER'].info('Parallel executor shutdown %s' % self)                            
                GLOBAL['CONCURRENT_FUTURES'].shutdown(wait=True)
            GLOBAL['CONCURRENT_FUTURES'] = None
        
        def num_workers(self):
            return self._workers

    return Parallel(workers if not pct else int(pct*os.cpu_count()))
def shutdown()

Alias for noparallel()

Expand source code Browse git
def shutdown():
    """Alias for `vipy.globals.noparallel`"""    
    return noparallel()

Classes

class Dask (num_workers=None, threaded=True, dashboard=False, verbose=False, address=None)

Dask distributed client

Expand source code Browse git
class Dask(object):
    """Dask distributed client"""
    
    def __init__(self, num_workers=None, threaded=True, dashboard=False, verbose=False, address=None):
        from vipy.util import try_import
        try_import('dask', 'dask distributed'); import dask, dask.distributed;
    
        assert address is not None or num_workers is not None, "Invalid input"

        self._num_workers = num_workers
        self._has_dashboard = dashboard
        
        # Dask configuration: https://docs.dask.org/en/latest/configuration.html
        # - when using vipy.dataset.Dataset minibatch iterator, large minibatches can result in a warning about large graphs
        # - The end user can set these environemnt variables, and will only be overwritten with defaults here if not provided
        if 'DASK_LOGGING__DISTRIBUTED' not in os.environ:
            os.environ['DASK_LOGGING__DISTRIBUTED'] = 'warning' if not verbose else 'info'
        if 'DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT' not in os.environ:
            os.environ['DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT'] = "30s"
        if 'DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP' not in os.environ:
            os.environ['DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP'] = "30s"
        if 'DASK_DISTRIBUTED__DEPLOY__LOST_WORKER_TIMEOUT' not in os.environ:
            os.environ['DASK_DISTRIBUTED__DEPLOY__LOST_WORKER_TIMEOUT'] = "30s"
        if 'DASK_DISTRIBUTED__COMM__RETRY__COUNT' not in os.environ:
            os.environ['DASK_DISTRIBUTED__COMM__RETRY__COUNT'] = "10"        
        if 'DASK_ADMIN_LARGE_GRAPH_WARNING_THREHSOLD' not in os.environ:
            os.environ['DASK_ADMIN_LARGE_GRAPH_WARNING_THREHSOLD'] = "50MB"        

        dask.config.refresh()
        
        dask.config.set({'DISTRIBUTED.COMM.RETRY.COUNT'.lower():int(os.environ['DASK_DISTRIBUTED__COMM__RETRY__COUNT'])})
        dask.config.set({'DISTRIBUTED.COMM.TIMEOUTS.CONNECT'.lower():os.environ['DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT']})
        dask.config.set({'DISTRIBUTED.COMM.TIMEOUTS.TCP'.lower():os.environ['DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP']})
        dask.config.set({'DISTRIBUTED.DEPLOY.LOST_WORKER_TIMEOUT'.lower():os.environ['DASK_DISTRIBUTED__DEPLOY__LOST_WORKER_TIMEOUT']})        
        dask.config.set({"distributed.admin.large-graph-warning-threshold": os.environ['DASK_ADMIN_LARGE_GRAPH_WARNING_THREHSOLD']})
        
        
        # Worker env
        env = {'VIPY_BACKEND':'Agg',  # headless in workers
               'PYTHONOPATH':os.environ['PYTHONPATH'] if 'PYTHONPATH' in os.environ else '',
               'PATH':os.environ['PATH'] if 'PATH' in os.environ else ''}

        if 'VIPY_CACHE' in os.environ:
            env.update({'VIPY_CACHE':os.environ['VIPY_CACHE']})
        if 'VIPY_AWS_ACCESS_KEY_ID' in os.environ:
            env.update({'VIPY_AWS_ACCESS_KEY_ID':os.environ['VIPY_AWS_ACCESS_KEY_ID']})            
        if 'VIPY_AWS_SECRET_ACCESS_KEY' in os.environ:
            env.update({'VIPY_AWS_SECRET_ACCESS_KEY':os.environ['VIPY_AWS_SECRET_ACCESS_KEY']})        
                    
        for (k,v) in os.environ.items():
            if k.startswith('DASK_'):
                env[k] = v
    
        if address is not None:
            # Distributed scheduler
            self._client = dask.distributed.Client(name='vipy', address=address)

            # Update key environment variables on remote workers using out of band function (yuck)
            # Make sure that any environment variables are accessible on all machines!  (e.g. VIPY_CACHE)
            # If not, then you need to unset these variables from os.environ prior to calling Dask()
            def _f_setenv_remote(localenv):
                import os; os.environ.update(localenv)

            localenv = {k:v for (k,v) in os.environ.items() if k.startswith('VIPY_')}
            localenv.update( {'VIPY_BACKEND':'Agg'} )
            self._client.run(lambda env=localenv: _f_setenv_remote(env))

        else:
            kwargs = {'name':'vipy',
                      'address':address,  # to connect to distributed scheduler HOSTNAME:PORT
                      'scheduler_port':0,   # random
                      'dashboard_address':None if not dashboard else ':0',  # random port
                      'processes':not threaded,
                      'threads_per_worker':1,
                      'n_workers':num_workers,
                      'local_directory':tempfile.mkdtemp()}
            kwargs.update({'env':env, 'direct_to_workers':True} if not threaded else {})
            
            # Local scheduler
            self._client = dask.distributed.Client(**kwargs)
            

    def __repr__(self):
        if self._num_workers is not None:
            # Local 
            return str('<vipy.globals.Dask: %s%s>' % ('workers=%d' % self.num_workers(), ', dashboard=%s' % str(self._client.dashboard_link) if self._has_dashboard else ''))
        elif self._client is not None:
            # Distributed
            return str('<vipy.globals.Dask: %s>' % (str(self._client)))
        else:
            return str('<vipy.globals.Dask: shutdown')

    def num_workers(self):
        return self._num_workers

    def shutdown(self):
        self._client.close()
        self._num_workers = None
        GLOBAL['DASK_CLIENT'] = None        
        return self

    def client(self):
        return self._client

Methods

def client(self)
Expand source code Browse git
def client(self):
    return self._client
def num_workers(self)
Expand source code Browse git
def num_workers(self):
    return self._num_workers
def shutdown(self)
Expand source code Browse git
def shutdown(self):
    self._client.close()
    self._num_workers = None
    GLOBAL['DASK_CLIENT'] = None        
    return self