Module vipy.dataset
Expand source code Browse git
import os
import numpy as np
from vipy.util import findpkl, toextension, filepath, filebase, jsonlist, ishtml, ispkl, filetail, temphtml, env, cache
from vipy.util import listpkl, listext, templike, tempdir, remkdir, tolist, fileext, writelist, tempcsv, to_iterable
from vipy.util import newpathroot, listjson, extlist, filefull, tempdir, groupbyasdict, try_import, shufflelist, catcher
from vipy.util import is_email_address, findjson, findimages, isimageurl, countby, chunkgen, chunkgenbysize, dividelist, chunklist
from vipy.util import findvideos, truncate_string
from vipy.globals import log
import random
import shutil
import copy
import gc
import itertools
from pathlib import Path
import functools
import concurrent.futures as cf
import vipy.parallel
class Dataset():
"""vipy.dataset.Dataset() class
Common class to manipulate large sets of objects in parallel
Args:
- dataset [list, tuple, set, obj]: a python built-in type that supports indexing or a generic object that supports indexing and has a length
- id [str]: an optional id of this dataset, which provides a descriptive name of the dataset
- loader [callable]: a callable loader that will construct the object from a raw data element in dataset. This is useful for custom deerialization or on demand transformations
Datasets can be indexed, shuffled, iterated, minibatched, sorted, sampled, partitioned.
Datasets constructed of vipy objects are lazy loaded, delaying loading pixels until they are needed
```python
(trainset, valset, testset) = vipy.dataset.registry('mnist')
(trainset, valset) = trainset.partition(0.9, 0.1)
categories = trainset.set(lambda im: im.category())
smaller = testset.take(1024)
preprocessed = smaller.map(lambda im: im.resize(32, 32).gain(1/256))
for b in preprocessed.minibatch(128):
print(b)
# visualize the dataset
(trainset, valset, testset) = vipy.dataset.registry('pascal_voc_2007')
for im in trainset:
im.mindim(1024).show().print(sleep=1).close()
```
Datasets can be constructed from directories of json files or image files (`vipy.dataset.Dataset.from_directory`)
Datasets can be constructed from a single json file containing a list of objects (`vipy.dataset.Dataset.from_json`)
..note:: that if a lambda function is provided as loader then this dataset is not serializable. Use self.load() then serialize
"""
__slots__ = ('_id', '_ds', '_idx', '_loader', '_type')
def __init__(self, dataset, id=None, loader=None):
assert loader is None or callable(loader)
self._id = id
self._ds = dataset if not isinstance(dataset, Dataset) else dataset._ds
self._idx = None if not isinstance(dataset, Dataset) else dataset._idx # random access on-demand
self._loader = loader if not isinstance(dataset, Dataset) else dataset._loader # not serializable if lambda is provided
try:
self._type = str(type(self._loader(dataset[0]) if self._loader else dataset[0])) # peek at first element, cached
except:
self._type = None
@classmethod
def from_directory(cls, indir, filetype='json', id=None):
"""Recursively search indir for filetype, construct a dataset from all discovered files of that type"""
if filetype == 'json':
return cls([x for f in findjson(indir) for x in to_iterable(vipy.load(f))], id=id)
elif filetype.lower() in ['jpg','jpeg','images']:
return cls([vipy.image.Image(filename=f) for f in findimages(indir)], id=id)
elif filetype.lower() in ['mp4','videos']:
return cls([vipy.image.Video(filename=f) for f in findvideos(indir)], id=id)
else:
raise ValueError('unsupported file type "%s"' % filetype)
@classmethod
def from_image_urls(cls, urls, id=None):
"""Construct a dataset from a list of image URLs"""
return cls([vipy.image.Image(url=url) for url in to_iterable(urls) if isimageurl(url)], id=id)
@classmethod
def from_json(cls, jsonfile, id=None):
return cls([x for x in to_iterable(vipy.load(jsonfile))], id=id)
@classmethod
def cast(cls, obj):
return cls(obj) if not isinstance(obj, Dataset) else obj
def __repr__(self):
fields = ['id=%s' % truncate_string(self.id(), maxlen=80)] if self.id() else []
fields += ['len=%d' % self.len()] if self.len() is not None else []
fields += ['type=%s' % self._type] if self._type else []
return str('<vipy.dataset.Dataset: %s>' % ', '.join(fields))
def __iter__(self):
if self.is_streaming():
for x in self._ds: # iterable access (faster)
yield self._loader(x) if self._loader is not None else x
else:
for k in range(len(self)):
yield self[k] # random access (slower)
def __getitem__(self, k):
assert self.len() is not None, "dataset does not support indexing"
idx = self.index() # convert to random access on demand
if isinstance(k, (int, np.uint64)):
assert abs(k) < len(idx), "invalid index"
x = self._ds[idx[int(k)]]
x = self._loader(x) if self._loader is not None else x
return x
elif isinstance(k, slice):
X = [self._ds[k] for k in idx[k.start:k.stop:k.step]]
X = [self._loader(x) for x in X] if self._loader is not None else X
return X
else:
raise ValueError('invalid slice "%s"' % type(k))
def raw(self):
"""Return a view of this dataset without the loader"""
return Dataset(self._ds, loader=None)
def is_streaming(self):
return self._idx is None
def len(self):
return len(self._idx) if self._idx is not None else (len(self._ds) if hasattr(self._ds, '__len__') else None)
def __len__(self):
len = self.len()
if len is None:
raise ValueError('dataset has no length')
return len
def __or__(self, other):
assert isinstance(other, Dataset)
return Union(self, other, id=self.id())
def id(self, new_id=None):
"""Change the dataset ID to the provided ID, or return it if None"""
if new_id is not None:
self._id = new_id
return self
return self._id
def index(self, index=None, strict=False):
"""Update the index, useful for filtering of large datasets"""
if index is not None:
assert not strict or index is None or (len(index)>0 and len(index)<=len(self) and max(index)<len(self) and min(index)>=0)
self._idx = index
return self
if self._idx is None:
self._idx = list(range(len(self._ds))) # on-demand index, only if underlying dataset has known length
return self._idx
def clone(self, deep=False):
"""Return a copy of the dataset object"""
if not deep:
return copy.copy(self)
else:
return copy.deepcopy(self)
def shuffle(self, shuffler=None):
"""Permute elements in this dataset uniformly at random in place using the optimal shuffling strategy for the dataset structure to maximize performance.
This method will use either Dataset.streaming_shuffler (for iterable datasets) or Dataset.uniform_shuffler (for random access datasets)
"""
assert shuffler is None or callable(shuffler)
shuffler = shuffler if shuffler is not None else (Dataset.streaming_shuffler if self.is_streaming() else Dataset.uniform_shuffler)
return shuffler(self)
def repeat(self, n):
"""Repeat the dataset n times. If n=0, the dataset is unchanged, if n=1 the dataset is doubled in length, etc."""
assert n>=0
return self.index( self.index()*(n+1) )
def tuple(self, mapper=None, flatten=False, reducer=None):
"""Return the dataset as a tuple, applying the optional mapper lambda on each element, applying optional flattener on sequences returned by mapper, and applying the optional reducer lambda on the final tuple, return a generator"""
assert mapper is None or callable(mapper)
assert reducer is None or callable(reducer)
mapped = self.map(mapper) if mapper else self
flattened = (y for x in mapped for y in x) if flatten else (x for x in mapped)
reduced = reducer(flattened) if reducer else flattened
return reduced
def list(self, mapper=None, flatten=False):
"""Return a tuple as a list, loading into memory"""
return self.tuple(mapper, flatten, reducer=list)
def set(self, mapper=None, flatten=False):
"""Return the dataset as a set. Mapper must be a lambda function that returns a hashable type"""
return self.tuple(mapper=mapper, reducer=set, flatten=flatten)
def all(self, mapper):
return self.tuple(mapper=mapper, reducer=all)
def frequency(self, f):
"""Frequency counts for which lambda returns the same value. For example f=lambda im: im.category() returns a dictionary of category names and counts in this category"""
return countby(self.tuple(mapper=f))
def balanced(self, f):
"""Is the dataset balanced (e.g. the frequencies returned from the lambda f are all the same)?"""
return len(set(self.frequency(f).values())) == 1
def count(self, f):
"""Counts for each element for which lamba returns true.
Args:
f: [lambda] if provided, count the number of elements that return true.
Returns:
A length of elements that satisfy f(v) = True [if f is not None]
"""
return len(self.tuple(f, reducer=lambda X: [x for x in X if x is True]))
def countby(self, f):
return self.frequency(f)
def filter(self, f):
"""In place filter with lambda function f, keeping those elements obj in-place where f(obj) evaluates true. Callable should return bool"""
assert callable(f)
return self.index( [i for (b,i) in zip(self.localmap(f), self.index()) if b] )
def take(self, n, inplace=False):
"""Randomly Take n elements from the dataset, and return a dataset (in-place or cloned). If n is greater than the size of the dataset, sample with replacement, if n is less than the size of the dataset, sample without replacement"""
assert isinstance(n, int) and n>0
D = self.clone() if not inplace else self
return D.index(list((random.sample if n<= len(self) else random.choices)(D.index(), k=n)) )
def groupby(self, f):
"""Group the dataset according to the callable f, returning dictionary of grouped datasets."""
assert callable(f)
return {k:self.clone().index([x[1] for x in v]).id('%s:%s' % (self.id(),str(k))) for (k,v) in itertools.groupby(enumerate(self.sort(f).index()), lambda x: f(self[x[0]]))}
def takeby(self, f, n):
"""Filter the dataset according to the callable f, take n from each group and return a dataset. Callable should return bool. If n==1, return a singleton"""
d = self.clone().filter(f)
return d.take(n) if n>1 else d.takeone()
def takelist(self, n):
"""Take n elements and return list. The elements are loaded and not cloned."""
return self.take(n).list()
def takeone(self):
"""Randomly take one element from the dataset and return a singleton"""
return self[random.randint(0, len(self)-1)]
def sample(self):
"""Return a single element sampled uniformly at random"""
return self.takeone()
def take_fraction(self, p, inplace=False):
"""Randomly take a percentage of the dataset, returning a clone or in-place"""
assert p>=0 and p<=1, "invalid fraction '%s'" % p
return self.take(n=int(len(self)*p), inplace=inplace)
def inverse_frequency(self, f):
"""Return the inverse frequency of elements grouped by the callable f. Returns a dictionary of the callable output to inverse frequency """
attributes = self.set(f)
frequency = self.frequency(f)
return {a:(1/len(attributes))*(len(self)/frequency[a]) for a in attributes} # (normalized) inverse frequency weight
def load(self):
"""Cache the entire dataset into memory"""
return Dataset([x for x in self], id=self.id())
def chunk(self, n):
"""Yield n chunks as list. Last chunk will be ragged."""
for (k,c) in enumerate(chunkgen(self, n)):
yield list(c)
def batch(self, n):
"""Yield batches of size n as datasets. Last batch will be ragged. Batches are not loaded. Batches have appended id equal to the zero-indexed batch order"""
for (k,b) in enumerate(chunkgenbysize(self, n)):
yield Dataset(b).id('%s:%d' % (self.id() if self.id() else '', k))
def minibatch(self, n, ragged=True, loader=None, bufsize=1024, accepter=None, preprocessor=None):
"""Yield preprocessed minibatches of size n of this dataset.
To yield chunks of this dataset, suitable for minibatch training/testing
```python
D = vipy.dataset.Dataset(...)
for b in D.minibatch(n):
print(b)
```
To perform minibatch image downloading in parallel across four processes with the context manager:
```python
D = vipy.dataset.registry('yfcc100m_url:train').take(128)
with vipy.globals.parallel(4):
for b in D.minibatch(16, loader=vipy.image.Transform.download, accepter=lambda im: im.is_downloaded()):
print(b) # complete minibatch that passed accepter
```
Args:
n [int]: The size of the minibatch
ragged [bool]: If ragged=true, then the last chunk will be ragged with len(chunk)<n, else skipped
bufsize [int]: The size of the buffer used in parallel processing of elements. Useful for parallel loading
accepter [callable]: A callable that returns true|false on an element, where only elements that return true are included in the minibatch. useful for parallel loading of elements that may fail to download
loader [callable]: A callable that is applied to every element of the dataset. Useful for parallel loading
Returns:
Iterator over `vipy.dataset.Dataset` elements of length n. Minibatches will be yielded loaded and preprocessed (processing done concurrently if vipy.parallel.executor() is initialized)
..note:: The distributed iterator appends the minibatch index to the minibatch.id().
..note:: If there exists a vipy.parallel.exeuctor(), then loading and preprocessing will be performed concurrently
"""
for (k,b) in enumerate(chunkgenbysize(vipy.parallel.iter(self, mapper=loader, bufsize=max(bufsize,n), accepter=accepter), n)):
if ragged or len(b) == n:
yield Dataset.cast(b).id('%s:%d' % (self.id() if self.id() else '', k))
def shift(self, m):
"""Circular shift the dataset m elements to the left, so that self[k+m] == self.shift(m)[k]. Circular shift for boundary handling so that self.shift(m)[-1] == self[m-1]"""
return self.clone().index(self.index()[m:] + self.index()[0:m])
def slice(self, start=0, stop=-1, step=1):
"""Slice the dataset to contain elements defined by slice(start, stop, step)"""
return self.clone().index(self.index()[start:stop:step])
def truncate(self, m):
"""Truncate the dataset to contain the first m elements only"""
return self.slice(stop=m)
def pipeline(self, n, m, ragged=True, prepad=True, postpad=True):
"""Yield pipelined minibatches of size n with pipeline length m.
A pipelined minibatch is a tuple (head, tail) such that (head, tail) are minibatches at different indexes in the dataset.
Head corresponds to the current minibatch and tail corresponds to the minibatch left shifted by (m-1) minibatches.
This structure is useful for yielding datasets for pipelined training where head contains the minibatch that will complete pipeline training on this iteration, and tail contains the
next minibatch to be inserted into the pipeline on this iteration.
```python
D = vipy.dataset.Dataset(...)
for (head, tail) in D.pipeline(n, m, prepad=False, postpad=False):
assert head == D[0:m]
assert tail == D[n*(m-1): n*(m-1)+n]
Args:
n [int]: The size of each minibatch
m [int]: The pipeline length in minibatches
ragged [bool]: If ragged=true, then the last chunk will be ragged with len(chunk)<n, else skipped
prepad: If true, yield (head, tail) == (None, batch) when filling the pipeline
postpad: If true, yield (head, tail) == (batch, None) when flushing the pipeline
Returns:
Iterator over tuples (head,tail) of `vipy.dataset.Dataset` elements of length n where tail is left shifted by n*(m-1) elements.
.. note:: The distributed iterator is not order preserving over minibatches and yields minibatches as completed, however the tuple (head, tail) is order preserving within the pipeline
.. note:: If there exists a vipy.parallel.executor(), then loading and preprocessing will be performed concurrently
"""
pipeline = []
for (k,b) in enumerate(self.minibatch(n, ragged=ragged)): # not order preserving
pipeline.append(b) # order preserving within pipeline
if k < m-1:
if prepad:
yield( (None, b) )
else:
yield( (pipeline.pop(0), b) ) # yield deque-like (minibatch, shifted minibatch) tuples
for p in pipeline:
if postpad:
yield( (p, None) )
def chunks(self, sizes):
"""Partition the dataset into chunks of size given by the tuple in partitions, and give the dataset suffix if provided"""
assert sum(sizes) == len(self)
i = 0
datasets = []
for n in sizes:
datasets.append(self.clone().index(self.index()[i:i+n]))
i += n
return datasets
def partition(self, trainfraction=0.9, valfraction=0.1, testfraction=0, trainsuffix=':train', valsuffix=':val', testsuffix=':test'):
"""Partition the dataset into the requested (train,val,test) fractions.
Args:
trainfraction [float]: fraction of dataset for training set
valfraction [float]: fraction of dataset for validation set
testfraction [float]: fraction of dataset for test set
trainsuffix: If not None, append this string the to trainset ID
valsuffix: If not None, append this string the to valset ID
testsuffix: If not None, append this string the to testset ID
Returns:
(trainset, valset, testset) such that trainset is the first trainfraction of the dataset.
.. note:: This does not permute the dataset. To randomize split, shuffle dataset first
"""
assert trainfraction >=0 and trainfraction <= 1, "invalid training set fraction '%f'" % trainfraction
assert valfraction >=0 and valfraction <= 1, "invalid validation set fraction '%f'" % valfraction
assert testfraction >=0 and testfraction <= 1, "invalid test set fraction '%f'" % testfraction
assert abs(trainfraction + valfraction + testfraction - 1) < 1E-6, "fractions must sum to one"
idx = self.index()
(testidx, validx, trainidx) = dividelist(idx, (testfraction, valfraction, trainfraction))
trainset = self.clone().index(trainidx)
if trainsuffix and trainset.id():
trainset.id(trainset.id() + trainsuffix)
valset = self.clone().index(validx)
if valsuffix and valset.id():
valset.id(valset.id() + valsuffix)
testset = self.clone().index(testidx)
if testsuffix and testset.id():
testset.id(testset.id() + testsuffix)
return (trainset,valset,testset) if testfraction!=0 else (trainset, valset)
def split(self, size):
"""Split the dataset into two datasets, one of length size, the other of length len(self)-size"""
assert isinstance(size, int) and size>=0 and size<len(self)
return self.partition(size/len(self), (len(self)-size)/len(self), 0, '', '', '')
def even_split(self):
"""Split the dataset into two datasets, each half the size of the dataset. If the dataset length is odd, then one element will be dropped"""
return self.chunks((len(self)//2, len(self)//2, len(self)%2))[0:2]
def streaming_map(self, mapper, accepter=None, bufsize=1024):
"""Returns a generator that will apply the mapper and yield only those elements that return True from the accepter. Performs the map in parallel if used in the vipy.globals.parallel context manager"""
return vipy.parallel.iter(self, mapper=mapper, accepter=accepter, bufsize=bufsize)
def map(self, f_map, strict=True, oneway=False, ordered=False):
"""Parallel map.
To perform this in parallel across four threads:
```python
D = vipy.dataset.Dataset(...)
with vipy.globals.parallel(4):
D = D.map(lambda v: ...)
```
Args:
f_map: [lambda] The lambda function to apply in parallel to all elements in the dataset. This must return a JSON serializable object (or set oneway=True)
strict: [bool] If true, raise exception on distributed map failures, otherwise the map will return only those that succeeded
oneway: [bool] If true, do not pass back results unless exception. This is useful for distributed processing
Returns:
A `vipy.dataset.Dataset` containing the elements f_map(v). This operation is order preserving if ordered=True.
.. note::
- This method uses dask distributed and `vipy.batch.Batch` operations
- Due to chunking, all error handling is caught by this method. Use `vipy.batch.Batch` to leverage dask distributed futures error handling.
- Operations must be chunked and serialized because each dask task comes with overhead, and lots of small tasks violates best practices
- Serialized results are deserialized by the client and returned a a new dataset
"""
assert f_map is None or callable(f_map), "invalid map function"
# Identity
if f_map is None:
return self
# Parallel map
elif vipy.globals.cf() is not None:
# This will fail on multiprocessing if dataset contains a loader lambda, or any element in the dataset contains a loader. Use distributed instead
assert ordered == False, "not order preserving, use localmap()"
return Dataset(tuple(vipy.parallel.map(f_map, self)), id=self.id())
# Distributed map
elif vipy.globals.dask() is not None:
from vipy.batch import Batch # requires pip install vipy[all]
f_serialize = lambda x: x
f_deserialize = lambda x: x
f_oneway = lambda x, oneway=oneway: x if not x[0] or not oneway else (x[0], None)
f_catcher = lambda f, *args, **kwargs: catcher(f, *args, **kwargs) # catch exceptions when executing lambda, return (True, result) or (False, exception)
f = lambda x, f_serializer=f_serialize, f_deserializer=f_deserialize, f_map=f_map, f_catcher=f_catcher, f_oneway=f_oneway: f_serializer(f_oneway(f_catcher(f_map, f_deserializer(x)))) # with closure capture
S = [f_serialize(v) for v in self] # local load, preprocess and serialize
B = Batch(chunklist(S, 128), strict=False, warnme=False, minscatter=128)
S = B.map(lambda X,f=f: [f(x) for x in X]).result() # distributed, chunked, with caught exceptions, may return empty list
V = [f_deserialize(x) for s in S for x in s] # Local deserialization and chunk flattening
# Error handling
(good, bad) = ([r for (b,r) in V if b], [r for (b,r) in V if not b]) # catcher returns (True, result) or (False, exception string)
if len(bad)>0:
log.warning('Exceptions in distributed processing:\n%s\n\n[vipy.dataset.Dataset.map]: %d/%d items failed' % (str(bad), len(bad), len(self)))
if strict:
raise ValueError('exceptions in distributed processing')
return Dataset(good, id=self.id()) if not oneway else None
# Local map
else:
return self.localmap(f_map)
def localmap(self, f):
"""A map performed without any parallel processing"""
return Dataset([f(x) for x in self], id=self.id()) # triggers load into memory
def zip(self, iter):
"""Returns a new dataset constructed by applying the callable on elements from zip(self,iter)"""
return Dataset(zip(self,iter))
def sort(self, f):
"""Sort the dataset in-place using the sortkey lambda function f
To perform a sort of the dataset using some property of the instance, such as the object category (e.g. for vipy.image.ImageCategory)
```python
dataset.sort(lambda im: im.category())
```
"""
idx = self.index()
return self.index( [idx[j] for (j,x) in sorted(zip(range(len(self)), self.tuple(f)), key=lambda x: x[1])] )
@staticmethod
def uniform_shuffler(D):
"""A uniform shuffle on the dataset elements. Iterable access will be slow due to random access"""
idx = D.index()
random.shuffle(idx)
return D.index(idx)
@staticmethod
def streaming_shuffler(D):
"""A uniform shuffle (approximation) on the dataset elements for iterable access only"""
assert D._idx is None, "streaming only"
try_import('datasets', 'datasets'); from datasets import Dataset as HuggingfaceDataset;
if isinstance(D._ds, (list, tuple)):
D._ds = list(D._ds)
random.shuffle(D._ds) # in-place shuffle objects
elif isinstance(D._ds, HuggingfaceDataset):
# Special case: Arrow backed dataset
D._ds = D._ds.to_iterable_dataset() # no random access
D._ds.shuffle() # approximate shuffling for IterableDataset is much more efficient for __iter__
else:
raise ValueError('shuffle error')
return D
@staticmethod
def identity_shuffler(D):
"""Shuffler that does nothing"""
return D
class Paged(Dataset):
""" Paged dataset.
A paged dataset is a dataset of length N=M*P constructed from M archive files (the pages) each containing P elements (the pagesize).
The paged dataset must be constructed with tuples of (pagesize, filename).
The loader will fetch, load and cache the pages on demand using the loader, preserving the most recently used cachesize pages
```python
D = vipy.dataset.Paged([(64, 'archive1.pkl'), (64, 'archive2.pkl')], lambda x,y: ivy.load(y))
```
.. note :: Shuffling this dataset is biased. Shuffling will be performed to mix the indexes, but not uniformly at random. The goal is to preserve data locality to minimize cache misses.
"""
def __init__(self, pagelist, loader, id=None, strict=True, index=None, cachesize=32):
super().__init__(dataset=pagelist,
id=id,
loader=loader).index(index if index else list(range(sum([p[0] for p in pagelist]))))
assert callable(loader), "page loader required"
assert not strict or len(set([x[0] for x in self._ds])) == 1 # pagesizes all the same
self._cachesize = cachesize
self._pagecache = {}
self._ds = list(self._ds)
self._pagesize = self._ds[0][0] # (pagesize, pklfile) tuples
def shuffle(self, shuffler=None):
"""Permute elements while preserve page locality to minimize cache misses"""
shuffler = shuffler if shuffler is not None else functools.partial(Paged.chunk_shuffler, chunksize=int(1.5*self._pagesize))
return shuffler(self)
def __getitem__(self, k):
if isinstance(k, (int, np.uint64)):
assert abs(k) < len(self._idx), "invalid index"
page = self._idx[int(k)] // self._pagesize
if page not in self._pagecache:
self._pagecache[page] = self._loader(*self._ds[page]) # load and cache new page
if len(self._pagecache) > self._cachesize:
self._pagecache.pop(list(self._pagecache.keys())[0]) # remove oldest
x = self._pagecache[page][int(k) % self._pagesize]
return x
elif isinstance(k, slice):
return [self[i] for i in range(len(self))[k.start if k.start else 0:k.stop if k.stop else len(self):k.step if k.step else 1]] # expensive
else:
raise ValueError('invalid index type "%s"' % type(k))
def flush(self):
self._pagecache = {}
return self
@staticmethod
def chunk_shuffler(D, chunker, chunksize=64):
"""Split dataset into len(D)/chunksize non-overlapping chunks with some common property returned by chunker, shuffle chunk order and shuffle within chunks.
- If chunksize=1 then this is equivalent to uniform_shuffler
- chunker must be a callable of some property that is used to group into chunks
"""
assert callable(chunker)
return D.randomize().sort(chunker).index([i for I in shufflelist([shufflelist(I) for I in chunkgenbysize(D.index(), chunksize)]) for i in I])
class Union(Dataset):
"""vipy.dataset.Union() class
Common class to manipulate groups of vipy.dataset.Dataset objects in parallel
Usage:
>>> cifar10 = vipy.dataset.registry('cifar10')
>>> mnist = vipy.dataset.registry('mnist')
>>> dataset = vipy.dataset.Union(mnist, cifar10)
>>> dataset = mnist | cifar10
Args:
Datasets
"""
__slots__ = ('_id', '_ds', '_idx', '_loader', '_type')
def __init__(self, *args, **kwargs):
assert all(isinstance(d, (Dataset, )) for d in args), "invalid datasets"
datasets = [d for d in args] # order preserving
assert all([isinstance(d, Dataset) for d in datasets]), "Invalid datasets '%s'" % str([type(d) for d in datasets])
datasets = [j for i in datasets for j in (i.datasets() if isinstance(i, Union) else (i,))] # flatten unions
self._ds = datasets
self._idx = None
self._id = kwargs['id'] if 'id' in kwargs else None
self._loader = None # individual datasets have loaders
self._type = None
def is_streaming(self):
return self._idx is None and all(d.is_streaming() for d in self.datasets())
def __len__(self):
return sum(d.__len__() for d in self.datasets()) if self._idx is None else len(self._idx)
def __iter__(self):
if self.is_streaming():
k = -1
iter = [d.__iter__() for d in self.datasets()] # round-robin
for m in range(len(self.datasets())):
try:
while True:
k = (k + 1) % len(iter)
yield next(iter[k]) # assumes ordered
except StopIteration:
iter.pop(k)
k -= 1
else:
self.index() # force random access
for (i,j) in self._idx:
yield self._ds[i][j] # random access (slower)
def __getitem__(self, k):
self.index() # force random access
if isinstance(k, (int, np.uint64)):
assert abs(k) < len(self._idx), "invalid index"
(i,j) = self._idx[int(k)]
return self._ds[i][j]
elif isinstance(k, slice):
return [self._ds[i][j] for (i,j) in self._idx[k.start:k.stop:k.step]]
else:
raise ValueError('invalid index type "%s"' % type(k))
def __repr__(self):
fields = ['id=%s' % truncate_string(self.id(), maxlen=64)] if self.id() else []
fields += ['len=%d' % len(self)]
fields += ['union=%s' % str(tuple([truncate_string(d.id(), maxlen=80) for d in self._ds]))]
return str('<vipy.dataset.Dataset: %s>' % (', '.join(fields)))
def index(self, index=None, strict=False):
"""Update the index, useful for filtering of large datasets"""
if index is not None:
self._idx = index
return self
if self._idx is None:
# Index on-demand: zipped (dataset index, element index) tuples, in round-robin dataset order [(0,0),(1,0),...,(0,n),(1,n),...]
lengths = [len(d) for d in self.datasets()]
self._idx = [c for r in [[(i,j) for i in range(len(self.datasets()))] for j in range(max(lengths))] for c in r if c[1]<lengths[c[0]]]
return self._idx
def clone(self, deep=False):
"""Return a copy of the dataset object"""
D = super().clone(deep=deep)
D._ds = [d.clone(deep=deep) for d in D._ds]
return D
def datasets(self):
"""Return the dataset union elements, useful for generating unions of unions"""
return list(self._ds)
def shuffle(self, shuffler=None):
"""Permute elements in this dataset uniformly at random in place using the best shuffler for the dataset structure"""
shuffler = shuffler if shuffler is not None else (Union.streaming_shuffler if self.is_streaming() else Dataset.uniform_shuffler)
return shuffler(self)
@staticmethod
def streaming_shuffler(D):
"""A uniform shuffle (approximation) on the dataset elements for iterable access only"""
assert D._idx is None, "iterable dataset only"
D._ds = [Dataset.streaming_shuffler(d) for d in D._ds] # shuffle dataset shards
random.shuffle(D._ds) # shuffle union order
return D
def registry(name=None, datadir=None, freeze=True, clean=False, download=False, split='train'):
"""Common entry point for loading datasets by name.
Usage:
>>> trainset = vipy.dataset.registry('cifar10', split='train') # return a training split
>>> valset = vipy.dataset.registry('cifar10:val', datadir='/tmp/cifar10') # download to a custom location
>>> datasets = vipy.dataset.registry(('cifar10:train','cifar100:train')) # return a union
>>> vipy.dataset.registry() # print allowable datasets
Args:
name [str]: The string name for the dataset. If tuple, return a `vipy.dataset.Union`. If None, return the list of registered datasets. Append name:train, name:val, name:test to output the requested split, or use the split keyword.
datadir [str]: A path to a directory to store data. Defaults to environment variable VIPY_DATASET_REGISTRY_HOME (then VIPY_CACHE if not found). Also uses HF_HOME for huggingface datasets. Datasets will be stored in datadir/name
freeze [bool]: If true, disable reference cycle counting for the loaded object (which will never contain cycles anyway)
clean [bool]: If true, force a redownload of the dataset to correct for partial download errors
download [bool]: If true, force a redownload of the dataset to correct for partial download errors. This is a synonym for clean=True
split [str]: return 'train', 'val' or 'test' split. If None, return (trainset, valset, testset) tuple
Datasets:
'mnist','cifar10','cifar100','caltech101','caltech256','oxford_pets','sun397', 'food101','stanford_dogs',
'flickr30k','oxford_fgvc_aircraft','oxford_flowers_102','eurosat','d2d','ethzshapes','coil100','kthactions',
'yfcc100m','yfcc100m_url','tiny_imagenet','coyo300m','coyo700m','pascal_voc_2007','coco_2014', 'ava',
'activitynet', 'open_images_v7', 'imagenet', 'imagenet21k', 'visualgenome' ,'widerface','meva_kf1',
'objectnet','lfw','inaturalist_2021','kinetics','hmdb','places365','ucf101','lvis','kitti',
'imagenet_localization','laion2b','datacomp_1b','imagenet2014_det','imagenet_faces','youtubeBB',
'pip_370k','pip_175k','cap','cap_pad','cap_detection','tiny_virat'
Returns:
(trainset, valset, testset) tuple where each is a `vipy.dataset.Dataset` or None, or a single split if name has a ":SPLIT" suffix or split kwarg provided
"""
import vipy.data
datasets = ('mnist','cifar10','cifar100','caltech101','caltech256','oxford_pets','sun397', 'stanford_dogs','coil100',
'flickr30k','oxford_fgvc_aircraft','oxford_flowers_102', 'food101', 'eurosat','d2d','ethzshapes','kthactions',
'yfcc100m','yfcc100m_url','tiny_imagenet','coyo300m','coyo700m','pascal_voc_2007','coco_2014', 'ava',
'activitynet','open_images_v7','imagenet','imagenet21k','visualgenome','widerface', 'youtubeBB',
'objectnet','lfw','inaturalist_2021','kinetics','hmdb','places365','ucf101','kitti','meva_kf1',
'lvis','imagenet_localization','laion2b','datacomp_1b','imagenet2014_det','imagenet_faces',
'pip_175k','pip_370k','cap','cap_pad','cap_detection','tiny_virat') # Add to docstring too...
if name is None:
return tuple(sorted(datasets))
if isinstance(name, (tuple, list)):
assert all(n.startswith(datasets) for n in name)
assert split is not None or all(':' in n for n in name)
return Union(*(registry(n, datadir=datadir, freeze=freeze, clean=clean, download=download, split=split) for n in name))
(name, split) = name.split(':',1) if name.count(':')>0 else (name, split)
if name not in datasets:
raise ValueError('unknown dataset "%s" - choose from "%s"' % (name, ', '.join(sorted(datasets))))
if split not in [None, 'train', 'test', 'val']:
raise ValueError('unknown split "%s" - choose from "%s"' % (split, ', '.join([str(None), 'train', 'test', 'val'])))
datadir = remkdir(datadir if datadir is not None else (env('VIPY_DATASET_REGISTRY_HOME') if 'VIPY_DATASET_REGISTRY_HOME' in env() else cache()))
namedir = Path(datadir)/name
if (clean or download) and name in datasets and os.path.exists(namedir):
log.info('Removing cached dataset "%s"' % namedir)
shutil.rmtree(namedir) # delete cached subtree to force redownload ...
if freeze:
gc.disable()
(trainset, valset, testset) = (None, None, None)
if name == 'mnist':
(trainset, testset) = vipy.data.hf.mnist()
elif name == 'cifar10':
(trainset, testset) = vipy.data.hf.cifar10()
elif name == 'cifar100':
(trainset, testset) = vipy.data.hf.cifar100()
elif name == 'caltech101':
trainset = vipy.data.caltech101.Caltech101(namedir)
elif name == 'caltech256':
trainset = vipy.data.caltech256.Caltech256(namedir)
elif name == 'oxford_pets':
(trainset, testset) = vipy.data.hf.oxford_pets()
elif name == 'sun397':
(trainset, valset, testset) = vipy.data.hf.sun397()
elif name == 'stanford_dogs':
trainset = vipy.data.stanford_dogs.StanfordDogs(namedir)
elif name == 'food101':
trainset = vipy.data.food101.Food101(namedir)
elif name == 'eurosat':
trainset = vipy.data.eurosat.EuroSAT(namedir)
elif name == 'd2d':
trainset = vipy.data.d2d.D2D(namedir)
elif name == 'coil100':
trainset = vipy.data.coil100.COIL100(namedir)
elif name == 'kthactions':
(trainset, testset) = vipy.data.kthactions.KTHActions(namedir).split()
elif name == 'ethzshapes':
trainset = vipy.data.ethzshapes.ETHZShapes(namedir)
elif name == 'flickr30k':
trainset = vipy.data.hf.flickr30k()
elif name == 'oxford_fgvc_aircraft':
trainset = vipy.data.hf.oxford_fgvc_aircraft()
elif name == 'oxford_flowers_102':
trainset = vipy.data.oxford_flowers_102.Flowers102(namedir)
elif name == 'yfcc100m':
(trainset, _, valset, _) = vipy.data.hf.yfcc100m()
elif name == 'yfcc100m_url':
(_, trainset, _, valset) = vipy.data.hf.yfcc100m()
elif name == 'tiny_imagenet':
(trainset, valset) = vipy.data.hf.tiny_imagenet()
elif name == 'coyo300m':
trainset = vipy.data.hf.coyo300m()
elif name == 'coyo700m':
trainset = vipy.data.hf.coyo700m()
elif name == 'datacomp_1b':
trainset = vipy.data.hf.datacomp_1b()
elif name == 'pascal_voc_2007':
(trainset, valset, testset) = vipy.data.hf.pascal_voc_2007()
elif name == 'coco_2014':
trainset = vipy.data.coco.COCO_2014(namedir)
elif name == 'ava':
ava = vipy.data.ava.AVA(namedir)
(trainset, valset) = (ava.trainset(), ava.valset())
elif name == 'activitynet':
activitynet = vipy.data.activitynet.ActivityNet(namedir) # ActivityNet 200
(trainset, valset, testset) = (activitynet.trainset(), activitynet.valset(), activitynet.testset())
elif name == 'open_images_v7':
trainset = vipy.data.openimages.open_images_v7(namedir)
elif name == 'imagenet':
imagenet = vipy.data.imagenet.Imagenet2012(namedir)
(trainset, valset) = (imagenet.classification_trainset(), imagenet.classification_valset())
elif name == 'imagenet_faces':
trainset = vipy.data.imagenet.Imagenet2012(Path(datadir)/'imagenet').faces()
elif name == 'imagenet21k':
trainset = vipy.data.imagenet.Imagenet21K(namedir)
elif name == 'visualgenome':
trainset = vipy.data.visualgenome.VisualGenome(namedir) # visualgenome-1.4
elif name == 'widerface':
trainset = vipy.data.widerface.WiderFace(namedir, split='train')
valset = vipy.data.widerface.WiderFace(namedir, split='val')
testset = vipy.data.widerface.WiderFace(namedir, split='test')
elif name == 'objectnet':
assert split is None or split == 'test', "objectnet is a test set"
testset = vipy.data.objectnet.Objectnet(namedir)
elif name == 'lfw':
trainset = vipy.data.lfw.LFW(namedir)
elif name == 'inaturalist_2021':
dataset = vipy.data.inaturalist.iNaturalist2021(namedir)
(trainset, valset) = (dataset.trainset(), dataset.valset())
elif name == 'kinetics':
dataset = vipy.data.kinetics.Kinetics700(namedir) # Kinetics700
(trainset, valset, testset) = (dataset.trainset(), dataset.valset(), dataset.testset())
elif name == 'hmdb':
trainset = vipy.dataset.Dataset(vipy.data.hmdb.HMDB(namedir).dataset(), id='hmdb')
elif name == 'places365':
places = vipy.data.places.Places365(namedir)
(trainset, valset) = (places.trainset(), places.valset())
elif name == 'ucf101':
trainset = vipy.data.ucf101.UCF101(namedir)
elif name == 'kitti':
trainset = vipy.data.kitti.KITTI(namedir, split='train')
testset = vipy.data.kitti.KITTI(namedir, split='test')
elif name == 'lvis':
lvis = vipy.data.lvis.LVIS(namedir)
(trainset, valset) = (lvis.trainset(), lvis.valset())
elif name == 'imagenet_localization':
trainset = vipy.data.imagenet.Imagenet2012(Path(datadir)/'imagenet').localization_trainset()
elif name == 'imagenet2014_det':
imagenet2014_det = vipy.data.imagenet.Imagenet2014_DET(namedir)
(trainset, valset, testset) = (imagenet2014_det.trainset(), imagenet2014_det.valset(), imagenet2014_det.testset())
elif name == 'laion2b':
trainset = vipy.data.hf.laion2b()
elif name == 'youtubeBB':
trainset = vipy.data.youtubeBB.YoutubeBB(namedir)
elif name == 'meva_kf1':
trainset = vipy.data.meva.KF1(namedir).dataset() # consider using "with vipy.globals.multiprocessing(pct=0.5):"
elif name == 'pip_175k':
trainset = vipy.data.pip.PIP_175k(namedir)
elif name == 'pip_370k':
trainset = vipy.data.pip.PIP_370k_stabilized(namedir)
elif name == 'cap':
trainset = vipy.data.cap.CAP_classification_clip(namedir)
elif name == 'cap_pad':
trainset = vipy.data.cap.CAP_classification_pad(namedir)
elif name == 'cap_detection':
trainset = vipy.data.cap.CAP_detection(namedir)
elif name == 'tiny_virat':
dataset = vipy.data.tiny_virat.TinyVIRAT(namedir)
(trainset, valset, testset) = (dataset.trainset(), dataset.valset(), dataset.testset())
else:
raise ValueError('unknown dataset "%s" - choose from "%s"' % (name, ', '.join(sorted(datasets))))
if freeze:
gc.enable()
gc.collect()
gc.freeze() # python-3.7
if split == 'train':
return trainset
elif split == 'val':
return valset
elif split == 'test':
return testset
else:
return (trainset, valset, testset)
Functions
def registry(name=None, datadir=None, freeze=True, clean=False, download=False, split='train')
-
Common entry point for loading datasets by name.
Usage
>>> trainset = vipy.dataset.registry('cifar10', split='train') # return a training split >>> valset = vipy.dataset.registry('cifar10:val', datadir='/tmp/cifar10') # download to a custom location >>> datasets = vipy.dataset.registry(('cifar10:train','cifar100:train')) # return a union >>> vipy.dataset.registry() # print allowable datasets
Args
name [str]: The string name for the dataset. If tuple, return a
Union
. If None, return the list of registered datasets. Append name:train, name:val, name:test to output the requested split, or use the split keyword. datadir [str]: A path to a directory to store data. Defaults to environment variable VIPY_DATASET_REGISTRY_HOME (then VIPY_CACHE if not found). Also uses HF_HOME for huggingface datasets. Datasets will be stored in datadir/name freeze [bool]: If true, disable reference cycle counting for the loaded object (which will never contain cycles anyway) clean [bool]: If true, force a redownload of the dataset to correct for partial download errors download [bool]: If true, force a redownload of the dataset to correct for partial download errors. This is a synonym for clean=True split [str]: return 'train', 'val' or 'test' split. If None, return (trainset, valset, testset) tupleDatasets
'mnist','cifar10','cifar100','caltech101','caltech256','oxford_pets','sun397', 'food101','stanford_dogs', 'flickr30k','oxford_fgvc_aircraft','oxford_flowers_102','eurosat','d2d','ethzshapes','coil100','kthactions', 'yfcc100m','yfcc100m_url','tiny_imagenet','coyo300m','coyo700m','pascal_voc_2007','coco_2014', 'ava', 'activitynet', 'open_images_v7', 'imagenet', 'imagenet21k', 'visualgenome' ,'widerface','meva_kf1', 'objectnet','lfw','inaturalist_2021','kinetics','hmdb','places365','ucf101','lvis','kitti', 'imagenet_localization','laion2b','datacomp_1b','imagenet2014_det','imagenet_faces','youtubeBB', 'pip_370k','pip_175k','cap','cap_pad','cap_detection','tiny_virat'
Returns
(trainset, valset, testset) tuple where each is a
Dataset
or None, or a single split if name has a ":SPLIT" suffix or split kwarg providedExpand source code Browse git
def registry(name=None, datadir=None, freeze=True, clean=False, download=False, split='train'): """Common entry point for loading datasets by name. Usage: >>> trainset = vipy.dataset.registry('cifar10', split='train') # return a training split >>> valset = vipy.dataset.registry('cifar10:val', datadir='/tmp/cifar10') # download to a custom location >>> datasets = vipy.dataset.registry(('cifar10:train','cifar100:train')) # return a union >>> vipy.dataset.registry() # print allowable datasets Args: name [str]: The string name for the dataset. If tuple, return a `vipy.dataset.Union`. If None, return the list of registered datasets. Append name:train, name:val, name:test to output the requested split, or use the split keyword. datadir [str]: A path to a directory to store data. Defaults to environment variable VIPY_DATASET_REGISTRY_HOME (then VIPY_CACHE if not found). Also uses HF_HOME for huggingface datasets. Datasets will be stored in datadir/name freeze [bool]: If true, disable reference cycle counting for the loaded object (which will never contain cycles anyway) clean [bool]: If true, force a redownload of the dataset to correct for partial download errors download [bool]: If true, force a redownload of the dataset to correct for partial download errors. This is a synonym for clean=True split [str]: return 'train', 'val' or 'test' split. If None, return (trainset, valset, testset) tuple Datasets: 'mnist','cifar10','cifar100','caltech101','caltech256','oxford_pets','sun397', 'food101','stanford_dogs', 'flickr30k','oxford_fgvc_aircraft','oxford_flowers_102','eurosat','d2d','ethzshapes','coil100','kthactions', 'yfcc100m','yfcc100m_url','tiny_imagenet','coyo300m','coyo700m','pascal_voc_2007','coco_2014', 'ava', 'activitynet', 'open_images_v7', 'imagenet', 'imagenet21k', 'visualgenome' ,'widerface','meva_kf1', 'objectnet','lfw','inaturalist_2021','kinetics','hmdb','places365','ucf101','lvis','kitti', 'imagenet_localization','laion2b','datacomp_1b','imagenet2014_det','imagenet_faces','youtubeBB', 'pip_370k','pip_175k','cap','cap_pad','cap_detection','tiny_virat' Returns: (trainset, valset, testset) tuple where each is a `vipy.dataset.Dataset` or None, or a single split if name has a ":SPLIT" suffix or split kwarg provided """ import vipy.data datasets = ('mnist','cifar10','cifar100','caltech101','caltech256','oxford_pets','sun397', 'stanford_dogs','coil100', 'flickr30k','oxford_fgvc_aircraft','oxford_flowers_102', 'food101', 'eurosat','d2d','ethzshapes','kthactions', 'yfcc100m','yfcc100m_url','tiny_imagenet','coyo300m','coyo700m','pascal_voc_2007','coco_2014', 'ava', 'activitynet','open_images_v7','imagenet','imagenet21k','visualgenome','widerface', 'youtubeBB', 'objectnet','lfw','inaturalist_2021','kinetics','hmdb','places365','ucf101','kitti','meva_kf1', 'lvis','imagenet_localization','laion2b','datacomp_1b','imagenet2014_det','imagenet_faces', 'pip_175k','pip_370k','cap','cap_pad','cap_detection','tiny_virat') # Add to docstring too... if name is None: return tuple(sorted(datasets)) if isinstance(name, (tuple, list)): assert all(n.startswith(datasets) for n in name) assert split is not None or all(':' in n for n in name) return Union(*(registry(n, datadir=datadir, freeze=freeze, clean=clean, download=download, split=split) for n in name)) (name, split) = name.split(':',1) if name.count(':')>0 else (name, split) if name not in datasets: raise ValueError('unknown dataset "%s" - choose from "%s"' % (name, ', '.join(sorted(datasets)))) if split not in [None, 'train', 'test', 'val']: raise ValueError('unknown split "%s" - choose from "%s"' % (split, ', '.join([str(None), 'train', 'test', 'val']))) datadir = remkdir(datadir if datadir is not None else (env('VIPY_DATASET_REGISTRY_HOME') if 'VIPY_DATASET_REGISTRY_HOME' in env() else cache())) namedir = Path(datadir)/name if (clean or download) and name in datasets and os.path.exists(namedir): log.info('Removing cached dataset "%s"' % namedir) shutil.rmtree(namedir) # delete cached subtree to force redownload ... if freeze: gc.disable() (trainset, valset, testset) = (None, None, None) if name == 'mnist': (trainset, testset) = vipy.data.hf.mnist() elif name == 'cifar10': (trainset, testset) = vipy.data.hf.cifar10() elif name == 'cifar100': (trainset, testset) = vipy.data.hf.cifar100() elif name == 'caltech101': trainset = vipy.data.caltech101.Caltech101(namedir) elif name == 'caltech256': trainset = vipy.data.caltech256.Caltech256(namedir) elif name == 'oxford_pets': (trainset, testset) = vipy.data.hf.oxford_pets() elif name == 'sun397': (trainset, valset, testset) = vipy.data.hf.sun397() elif name == 'stanford_dogs': trainset = vipy.data.stanford_dogs.StanfordDogs(namedir) elif name == 'food101': trainset = vipy.data.food101.Food101(namedir) elif name == 'eurosat': trainset = vipy.data.eurosat.EuroSAT(namedir) elif name == 'd2d': trainset = vipy.data.d2d.D2D(namedir) elif name == 'coil100': trainset = vipy.data.coil100.COIL100(namedir) elif name == 'kthactions': (trainset, testset) = vipy.data.kthactions.KTHActions(namedir).split() elif name == 'ethzshapes': trainset = vipy.data.ethzshapes.ETHZShapes(namedir) elif name == 'flickr30k': trainset = vipy.data.hf.flickr30k() elif name == 'oxford_fgvc_aircraft': trainset = vipy.data.hf.oxford_fgvc_aircraft() elif name == 'oxford_flowers_102': trainset = vipy.data.oxford_flowers_102.Flowers102(namedir) elif name == 'yfcc100m': (trainset, _, valset, _) = vipy.data.hf.yfcc100m() elif name == 'yfcc100m_url': (_, trainset, _, valset) = vipy.data.hf.yfcc100m() elif name == 'tiny_imagenet': (trainset, valset) = vipy.data.hf.tiny_imagenet() elif name == 'coyo300m': trainset = vipy.data.hf.coyo300m() elif name == 'coyo700m': trainset = vipy.data.hf.coyo700m() elif name == 'datacomp_1b': trainset = vipy.data.hf.datacomp_1b() elif name == 'pascal_voc_2007': (trainset, valset, testset) = vipy.data.hf.pascal_voc_2007() elif name == 'coco_2014': trainset = vipy.data.coco.COCO_2014(namedir) elif name == 'ava': ava = vipy.data.ava.AVA(namedir) (trainset, valset) = (ava.trainset(), ava.valset()) elif name == 'activitynet': activitynet = vipy.data.activitynet.ActivityNet(namedir) # ActivityNet 200 (trainset, valset, testset) = (activitynet.trainset(), activitynet.valset(), activitynet.testset()) elif name == 'open_images_v7': trainset = vipy.data.openimages.open_images_v7(namedir) elif name == 'imagenet': imagenet = vipy.data.imagenet.Imagenet2012(namedir) (trainset, valset) = (imagenet.classification_trainset(), imagenet.classification_valset()) elif name == 'imagenet_faces': trainset = vipy.data.imagenet.Imagenet2012(Path(datadir)/'imagenet').faces() elif name == 'imagenet21k': trainset = vipy.data.imagenet.Imagenet21K(namedir) elif name == 'visualgenome': trainset = vipy.data.visualgenome.VisualGenome(namedir) # visualgenome-1.4 elif name == 'widerface': trainset = vipy.data.widerface.WiderFace(namedir, split='train') valset = vipy.data.widerface.WiderFace(namedir, split='val') testset = vipy.data.widerface.WiderFace(namedir, split='test') elif name == 'objectnet': assert split is None or split == 'test', "objectnet is a test set" testset = vipy.data.objectnet.Objectnet(namedir) elif name == 'lfw': trainset = vipy.data.lfw.LFW(namedir) elif name == 'inaturalist_2021': dataset = vipy.data.inaturalist.iNaturalist2021(namedir) (trainset, valset) = (dataset.trainset(), dataset.valset()) elif name == 'kinetics': dataset = vipy.data.kinetics.Kinetics700(namedir) # Kinetics700 (trainset, valset, testset) = (dataset.trainset(), dataset.valset(), dataset.testset()) elif name == 'hmdb': trainset = vipy.dataset.Dataset(vipy.data.hmdb.HMDB(namedir).dataset(), id='hmdb') elif name == 'places365': places = vipy.data.places.Places365(namedir) (trainset, valset) = (places.trainset(), places.valset()) elif name == 'ucf101': trainset = vipy.data.ucf101.UCF101(namedir) elif name == 'kitti': trainset = vipy.data.kitti.KITTI(namedir, split='train') testset = vipy.data.kitti.KITTI(namedir, split='test') elif name == 'lvis': lvis = vipy.data.lvis.LVIS(namedir) (trainset, valset) = (lvis.trainset(), lvis.valset()) elif name == 'imagenet_localization': trainset = vipy.data.imagenet.Imagenet2012(Path(datadir)/'imagenet').localization_trainset() elif name == 'imagenet2014_det': imagenet2014_det = vipy.data.imagenet.Imagenet2014_DET(namedir) (trainset, valset, testset) = (imagenet2014_det.trainset(), imagenet2014_det.valset(), imagenet2014_det.testset()) elif name == 'laion2b': trainset = vipy.data.hf.laion2b() elif name == 'youtubeBB': trainset = vipy.data.youtubeBB.YoutubeBB(namedir) elif name == 'meva_kf1': trainset = vipy.data.meva.KF1(namedir).dataset() # consider using "with vipy.globals.multiprocessing(pct=0.5):" elif name == 'pip_175k': trainset = vipy.data.pip.PIP_175k(namedir) elif name == 'pip_370k': trainset = vipy.data.pip.PIP_370k_stabilized(namedir) elif name == 'cap': trainset = vipy.data.cap.CAP_classification_clip(namedir) elif name == 'cap_pad': trainset = vipy.data.cap.CAP_classification_pad(namedir) elif name == 'cap_detection': trainset = vipy.data.cap.CAP_detection(namedir) elif name == 'tiny_virat': dataset = vipy.data.tiny_virat.TinyVIRAT(namedir) (trainset, valset, testset) = (dataset.trainset(), dataset.valset(), dataset.testset()) else: raise ValueError('unknown dataset "%s" - choose from "%s"' % (name, ', '.join(sorted(datasets)))) if freeze: gc.enable() gc.collect() gc.freeze() # python-3.7 if split == 'train': return trainset elif split == 'val': return valset elif split == 'test': return testset else: return (trainset, valset, testset)
Classes
class Dataset (dataset, id=None, loader=None)
-
vipy.dataset.Dataset() class
Common class to manipulate large sets of objects in parallel
Args
- dataset [list, tuple, set, obj]: a python built-in type that supports indexing or a generic object that supports indexing and has a length
- id [str]: an optional id of this dataset, which provides a descriptive name of the dataset
- loader [callable]: a callable loader that will construct the object from a raw data element in dataset. This is useful for custom deerialization or on demand transformations Datasets can be indexed, shuffled, iterated, minibatched, sorted, sampled, partitioned. Datasets constructed of vipy objects are lazy loaded, delaying loading pixels until they are needed
(trainset, valset, testset) = vipy.dataset.registry('mnist') (trainset, valset) = trainset.partition(0.9, 0.1) categories = trainset.set(lambda im: im.category()) smaller = testset.take(1024) preprocessed = smaller.map(lambda im: im.resize(32, 32).gain(1/256)) for b in preprocessed.minibatch(128): print(b) # visualize the dataset (trainset, valset, testset) = vipy.dataset.registry('pascal_voc_2007') for im in trainset: im.mindim(1024).show().print(sleep=1).close()
Datasets can be constructed from directories of json files or image files (
Dataset.from_directory()
) Datasets can be constructed from a single json file containing a list of objects (Dataset.from_json()
)Note: that if a lambda function is provided as loader then this dataset is not serializable. Use self.load() then serialize
Expand source code Browse git
class Dataset(): """vipy.dataset.Dataset() class Common class to manipulate large sets of objects in parallel Args: - dataset [list, tuple, set, obj]: a python built-in type that supports indexing or a generic object that supports indexing and has a length - id [str]: an optional id of this dataset, which provides a descriptive name of the dataset - loader [callable]: a callable loader that will construct the object from a raw data element in dataset. This is useful for custom deerialization or on demand transformations Datasets can be indexed, shuffled, iterated, minibatched, sorted, sampled, partitioned. Datasets constructed of vipy objects are lazy loaded, delaying loading pixels until they are needed ```python (trainset, valset, testset) = vipy.dataset.registry('mnist') (trainset, valset) = trainset.partition(0.9, 0.1) categories = trainset.set(lambda im: im.category()) smaller = testset.take(1024) preprocessed = smaller.map(lambda im: im.resize(32, 32).gain(1/256)) for b in preprocessed.minibatch(128): print(b) # visualize the dataset (trainset, valset, testset) = vipy.dataset.registry('pascal_voc_2007') for im in trainset: im.mindim(1024).show().print(sleep=1).close() ``` Datasets can be constructed from directories of json files or image files (`vipy.dataset.Dataset.from_directory`) Datasets can be constructed from a single json file containing a list of objects (`vipy.dataset.Dataset.from_json`) ..note:: that if a lambda function is provided as loader then this dataset is not serializable. Use self.load() then serialize """ __slots__ = ('_id', '_ds', '_idx', '_loader', '_type') def __init__(self, dataset, id=None, loader=None): assert loader is None or callable(loader) self._id = id self._ds = dataset if not isinstance(dataset, Dataset) else dataset._ds self._idx = None if not isinstance(dataset, Dataset) else dataset._idx # random access on-demand self._loader = loader if not isinstance(dataset, Dataset) else dataset._loader # not serializable if lambda is provided try: self._type = str(type(self._loader(dataset[0]) if self._loader else dataset[0])) # peek at first element, cached except: self._type = None @classmethod def from_directory(cls, indir, filetype='json', id=None): """Recursively search indir for filetype, construct a dataset from all discovered files of that type""" if filetype == 'json': return cls([x for f in findjson(indir) for x in to_iterable(vipy.load(f))], id=id) elif filetype.lower() in ['jpg','jpeg','images']: return cls([vipy.image.Image(filename=f) for f in findimages(indir)], id=id) elif filetype.lower() in ['mp4','videos']: return cls([vipy.image.Video(filename=f) for f in findvideos(indir)], id=id) else: raise ValueError('unsupported file type "%s"' % filetype) @classmethod def from_image_urls(cls, urls, id=None): """Construct a dataset from a list of image URLs""" return cls([vipy.image.Image(url=url) for url in to_iterable(urls) if isimageurl(url)], id=id) @classmethod def from_json(cls, jsonfile, id=None): return cls([x for x in to_iterable(vipy.load(jsonfile))], id=id) @classmethod def cast(cls, obj): return cls(obj) if not isinstance(obj, Dataset) else obj def __repr__(self): fields = ['id=%s' % truncate_string(self.id(), maxlen=80)] if self.id() else [] fields += ['len=%d' % self.len()] if self.len() is not None else [] fields += ['type=%s' % self._type] if self._type else [] return str('<vipy.dataset.Dataset: %s>' % ', '.join(fields)) def __iter__(self): if self.is_streaming(): for x in self._ds: # iterable access (faster) yield self._loader(x) if self._loader is not None else x else: for k in range(len(self)): yield self[k] # random access (slower) def __getitem__(self, k): assert self.len() is not None, "dataset does not support indexing" idx = self.index() # convert to random access on demand if isinstance(k, (int, np.uint64)): assert abs(k) < len(idx), "invalid index" x = self._ds[idx[int(k)]] x = self._loader(x) if self._loader is not None else x return x elif isinstance(k, slice): X = [self._ds[k] for k in idx[k.start:k.stop:k.step]] X = [self._loader(x) for x in X] if self._loader is not None else X return X else: raise ValueError('invalid slice "%s"' % type(k)) def raw(self): """Return a view of this dataset without the loader""" return Dataset(self._ds, loader=None) def is_streaming(self): return self._idx is None def len(self): return len(self._idx) if self._idx is not None else (len(self._ds) if hasattr(self._ds, '__len__') else None) def __len__(self): len = self.len() if len is None: raise ValueError('dataset has no length') return len def __or__(self, other): assert isinstance(other, Dataset) return Union(self, other, id=self.id()) def id(self, new_id=None): """Change the dataset ID to the provided ID, or return it if None""" if new_id is not None: self._id = new_id return self return self._id def index(self, index=None, strict=False): """Update the index, useful for filtering of large datasets""" if index is not None: assert not strict or index is None or (len(index)>0 and len(index)<=len(self) and max(index)<len(self) and min(index)>=0) self._idx = index return self if self._idx is None: self._idx = list(range(len(self._ds))) # on-demand index, only if underlying dataset has known length return self._idx def clone(self, deep=False): """Return a copy of the dataset object""" if not deep: return copy.copy(self) else: return copy.deepcopy(self) def shuffle(self, shuffler=None): """Permute elements in this dataset uniformly at random in place using the optimal shuffling strategy for the dataset structure to maximize performance. This method will use either Dataset.streaming_shuffler (for iterable datasets) or Dataset.uniform_shuffler (for random access datasets) """ assert shuffler is None or callable(shuffler) shuffler = shuffler if shuffler is not None else (Dataset.streaming_shuffler if self.is_streaming() else Dataset.uniform_shuffler) return shuffler(self) def repeat(self, n): """Repeat the dataset n times. If n=0, the dataset is unchanged, if n=1 the dataset is doubled in length, etc.""" assert n>=0 return self.index( self.index()*(n+1) ) def tuple(self, mapper=None, flatten=False, reducer=None): """Return the dataset as a tuple, applying the optional mapper lambda on each element, applying optional flattener on sequences returned by mapper, and applying the optional reducer lambda on the final tuple, return a generator""" assert mapper is None or callable(mapper) assert reducer is None or callable(reducer) mapped = self.map(mapper) if mapper else self flattened = (y for x in mapped for y in x) if flatten else (x for x in mapped) reduced = reducer(flattened) if reducer else flattened return reduced def list(self, mapper=None, flatten=False): """Return a tuple as a list, loading into memory""" return self.tuple(mapper, flatten, reducer=list) def set(self, mapper=None, flatten=False): """Return the dataset as a set. Mapper must be a lambda function that returns a hashable type""" return self.tuple(mapper=mapper, reducer=set, flatten=flatten) def all(self, mapper): return self.tuple(mapper=mapper, reducer=all) def frequency(self, f): """Frequency counts for which lambda returns the same value. For example f=lambda im: im.category() returns a dictionary of category names and counts in this category""" return countby(self.tuple(mapper=f)) def balanced(self, f): """Is the dataset balanced (e.g. the frequencies returned from the lambda f are all the same)?""" return len(set(self.frequency(f).values())) == 1 def count(self, f): """Counts for each element for which lamba returns true. Args: f: [lambda] if provided, count the number of elements that return true. Returns: A length of elements that satisfy f(v) = True [if f is not None] """ return len(self.tuple(f, reducer=lambda X: [x for x in X if x is True])) def countby(self, f): return self.frequency(f) def filter(self, f): """In place filter with lambda function f, keeping those elements obj in-place where f(obj) evaluates true. Callable should return bool""" assert callable(f) return self.index( [i for (b,i) in zip(self.localmap(f), self.index()) if b] ) def take(self, n, inplace=False): """Randomly Take n elements from the dataset, and return a dataset (in-place or cloned). If n is greater than the size of the dataset, sample with replacement, if n is less than the size of the dataset, sample without replacement""" assert isinstance(n, int) and n>0 D = self.clone() if not inplace else self return D.index(list((random.sample if n<= len(self) else random.choices)(D.index(), k=n)) ) def groupby(self, f): """Group the dataset according to the callable f, returning dictionary of grouped datasets.""" assert callable(f) return {k:self.clone().index([x[1] for x in v]).id('%s:%s' % (self.id(),str(k))) for (k,v) in itertools.groupby(enumerate(self.sort(f).index()), lambda x: f(self[x[0]]))} def takeby(self, f, n): """Filter the dataset according to the callable f, take n from each group and return a dataset. Callable should return bool. If n==1, return a singleton""" d = self.clone().filter(f) return d.take(n) if n>1 else d.takeone() def takelist(self, n): """Take n elements and return list. The elements are loaded and not cloned.""" return self.take(n).list() def takeone(self): """Randomly take one element from the dataset and return a singleton""" return self[random.randint(0, len(self)-1)] def sample(self): """Return a single element sampled uniformly at random""" return self.takeone() def take_fraction(self, p, inplace=False): """Randomly take a percentage of the dataset, returning a clone or in-place""" assert p>=0 and p<=1, "invalid fraction '%s'" % p return self.take(n=int(len(self)*p), inplace=inplace) def inverse_frequency(self, f): """Return the inverse frequency of elements grouped by the callable f. Returns a dictionary of the callable output to inverse frequency """ attributes = self.set(f) frequency = self.frequency(f) return {a:(1/len(attributes))*(len(self)/frequency[a]) for a in attributes} # (normalized) inverse frequency weight def load(self): """Cache the entire dataset into memory""" return Dataset([x for x in self], id=self.id()) def chunk(self, n): """Yield n chunks as list. Last chunk will be ragged.""" for (k,c) in enumerate(chunkgen(self, n)): yield list(c) def batch(self, n): """Yield batches of size n as datasets. Last batch will be ragged. Batches are not loaded. Batches have appended id equal to the zero-indexed batch order""" for (k,b) in enumerate(chunkgenbysize(self, n)): yield Dataset(b).id('%s:%d' % (self.id() if self.id() else '', k)) def minibatch(self, n, ragged=True, loader=None, bufsize=1024, accepter=None, preprocessor=None): """Yield preprocessed minibatches of size n of this dataset. To yield chunks of this dataset, suitable for minibatch training/testing ```python D = vipy.dataset.Dataset(...) for b in D.minibatch(n): print(b) ``` To perform minibatch image downloading in parallel across four processes with the context manager: ```python D = vipy.dataset.registry('yfcc100m_url:train').take(128) with vipy.globals.parallel(4): for b in D.minibatch(16, loader=vipy.image.Transform.download, accepter=lambda im: im.is_downloaded()): print(b) # complete minibatch that passed accepter ``` Args: n [int]: The size of the minibatch ragged [bool]: If ragged=true, then the last chunk will be ragged with len(chunk)<n, else skipped bufsize [int]: The size of the buffer used in parallel processing of elements. Useful for parallel loading accepter [callable]: A callable that returns true|false on an element, where only elements that return true are included in the minibatch. useful for parallel loading of elements that may fail to download loader [callable]: A callable that is applied to every element of the dataset. Useful for parallel loading Returns: Iterator over `vipy.dataset.Dataset` elements of length n. Minibatches will be yielded loaded and preprocessed (processing done concurrently if vipy.parallel.executor() is initialized) ..note:: The distributed iterator appends the minibatch index to the minibatch.id(). ..note:: If there exists a vipy.parallel.exeuctor(), then loading and preprocessing will be performed concurrently """ for (k,b) in enumerate(chunkgenbysize(vipy.parallel.iter(self, mapper=loader, bufsize=max(bufsize,n), accepter=accepter), n)): if ragged or len(b) == n: yield Dataset.cast(b).id('%s:%d' % (self.id() if self.id() else '', k)) def shift(self, m): """Circular shift the dataset m elements to the left, so that self[k+m] == self.shift(m)[k]. Circular shift for boundary handling so that self.shift(m)[-1] == self[m-1]""" return self.clone().index(self.index()[m:] + self.index()[0:m]) def slice(self, start=0, stop=-1, step=1): """Slice the dataset to contain elements defined by slice(start, stop, step)""" return self.clone().index(self.index()[start:stop:step]) def truncate(self, m): """Truncate the dataset to contain the first m elements only""" return self.slice(stop=m) def pipeline(self, n, m, ragged=True, prepad=True, postpad=True): """Yield pipelined minibatches of size n with pipeline length m. A pipelined minibatch is a tuple (head, tail) such that (head, tail) are minibatches at different indexes in the dataset. Head corresponds to the current minibatch and tail corresponds to the minibatch left shifted by (m-1) minibatches. This structure is useful for yielding datasets for pipelined training where head contains the minibatch that will complete pipeline training on this iteration, and tail contains the next minibatch to be inserted into the pipeline on this iteration. ```python D = vipy.dataset.Dataset(...) for (head, tail) in D.pipeline(n, m, prepad=False, postpad=False): assert head == D[0:m] assert tail == D[n*(m-1): n*(m-1)+n] Args: n [int]: The size of each minibatch m [int]: The pipeline length in minibatches ragged [bool]: If ragged=true, then the last chunk will be ragged with len(chunk)<n, else skipped prepad: If true, yield (head, tail) == (None, batch) when filling the pipeline postpad: If true, yield (head, tail) == (batch, None) when flushing the pipeline Returns: Iterator over tuples (head,tail) of `vipy.dataset.Dataset` elements of length n where tail is left shifted by n*(m-1) elements. .. note:: The distributed iterator is not order preserving over minibatches and yields minibatches as completed, however the tuple (head, tail) is order preserving within the pipeline .. note:: If there exists a vipy.parallel.executor(), then loading and preprocessing will be performed concurrently """ pipeline = [] for (k,b) in enumerate(self.minibatch(n, ragged=ragged)): # not order preserving pipeline.append(b) # order preserving within pipeline if k < m-1: if prepad: yield( (None, b) ) else: yield( (pipeline.pop(0), b) ) # yield deque-like (minibatch, shifted minibatch) tuples for p in pipeline: if postpad: yield( (p, None) ) def chunks(self, sizes): """Partition the dataset into chunks of size given by the tuple in partitions, and give the dataset suffix if provided""" assert sum(sizes) == len(self) i = 0 datasets = [] for n in sizes: datasets.append(self.clone().index(self.index()[i:i+n])) i += n return datasets def partition(self, trainfraction=0.9, valfraction=0.1, testfraction=0, trainsuffix=':train', valsuffix=':val', testsuffix=':test'): """Partition the dataset into the requested (train,val,test) fractions. Args: trainfraction [float]: fraction of dataset for training set valfraction [float]: fraction of dataset for validation set testfraction [float]: fraction of dataset for test set trainsuffix: If not None, append this string the to trainset ID valsuffix: If not None, append this string the to valset ID testsuffix: If not None, append this string the to testset ID Returns: (trainset, valset, testset) such that trainset is the first trainfraction of the dataset. .. note:: This does not permute the dataset. To randomize split, shuffle dataset first """ assert trainfraction >=0 and trainfraction <= 1, "invalid training set fraction '%f'" % trainfraction assert valfraction >=0 and valfraction <= 1, "invalid validation set fraction '%f'" % valfraction assert testfraction >=0 and testfraction <= 1, "invalid test set fraction '%f'" % testfraction assert abs(trainfraction + valfraction + testfraction - 1) < 1E-6, "fractions must sum to one" idx = self.index() (testidx, validx, trainidx) = dividelist(idx, (testfraction, valfraction, trainfraction)) trainset = self.clone().index(trainidx) if trainsuffix and trainset.id(): trainset.id(trainset.id() + trainsuffix) valset = self.clone().index(validx) if valsuffix and valset.id(): valset.id(valset.id() + valsuffix) testset = self.clone().index(testidx) if testsuffix and testset.id(): testset.id(testset.id() + testsuffix) return (trainset,valset,testset) if testfraction!=0 else (trainset, valset) def split(self, size): """Split the dataset into two datasets, one of length size, the other of length len(self)-size""" assert isinstance(size, int) and size>=0 and size<len(self) return self.partition(size/len(self), (len(self)-size)/len(self), 0, '', '', '') def even_split(self): """Split the dataset into two datasets, each half the size of the dataset. If the dataset length is odd, then one element will be dropped""" return self.chunks((len(self)//2, len(self)//2, len(self)%2))[0:2] def streaming_map(self, mapper, accepter=None, bufsize=1024): """Returns a generator that will apply the mapper and yield only those elements that return True from the accepter. Performs the map in parallel if used in the vipy.globals.parallel context manager""" return vipy.parallel.iter(self, mapper=mapper, accepter=accepter, bufsize=bufsize) def map(self, f_map, strict=True, oneway=False, ordered=False): """Parallel map. To perform this in parallel across four threads: ```python D = vipy.dataset.Dataset(...) with vipy.globals.parallel(4): D = D.map(lambda v: ...) ``` Args: f_map: [lambda] The lambda function to apply in parallel to all elements in the dataset. This must return a JSON serializable object (or set oneway=True) strict: [bool] If true, raise exception on distributed map failures, otherwise the map will return only those that succeeded oneway: [bool] If true, do not pass back results unless exception. This is useful for distributed processing Returns: A `vipy.dataset.Dataset` containing the elements f_map(v). This operation is order preserving if ordered=True. .. note:: - This method uses dask distributed and `vipy.batch.Batch` operations - Due to chunking, all error handling is caught by this method. Use `vipy.batch.Batch` to leverage dask distributed futures error handling. - Operations must be chunked and serialized because each dask task comes with overhead, and lots of small tasks violates best practices - Serialized results are deserialized by the client and returned a a new dataset """ assert f_map is None or callable(f_map), "invalid map function" # Identity if f_map is None: return self # Parallel map elif vipy.globals.cf() is not None: # This will fail on multiprocessing if dataset contains a loader lambda, or any element in the dataset contains a loader. Use distributed instead assert ordered == False, "not order preserving, use localmap()" return Dataset(tuple(vipy.parallel.map(f_map, self)), id=self.id()) # Distributed map elif vipy.globals.dask() is not None: from vipy.batch import Batch # requires pip install vipy[all] f_serialize = lambda x: x f_deserialize = lambda x: x f_oneway = lambda x, oneway=oneway: x if not x[0] or not oneway else (x[0], None) f_catcher = lambda f, *args, **kwargs: catcher(f, *args, **kwargs) # catch exceptions when executing lambda, return (True, result) or (False, exception) f = lambda x, f_serializer=f_serialize, f_deserializer=f_deserialize, f_map=f_map, f_catcher=f_catcher, f_oneway=f_oneway: f_serializer(f_oneway(f_catcher(f_map, f_deserializer(x)))) # with closure capture S = [f_serialize(v) for v in self] # local load, preprocess and serialize B = Batch(chunklist(S, 128), strict=False, warnme=False, minscatter=128) S = B.map(lambda X,f=f: [f(x) for x in X]).result() # distributed, chunked, with caught exceptions, may return empty list V = [f_deserialize(x) for s in S for x in s] # Local deserialization and chunk flattening # Error handling (good, bad) = ([r for (b,r) in V if b], [r for (b,r) in V if not b]) # catcher returns (True, result) or (False, exception string) if len(bad)>0: log.warning('Exceptions in distributed processing:\n%s\n\n[vipy.dataset.Dataset.map]: %d/%d items failed' % (str(bad), len(bad), len(self))) if strict: raise ValueError('exceptions in distributed processing') return Dataset(good, id=self.id()) if not oneway else None # Local map else: return self.localmap(f_map) def localmap(self, f): """A map performed without any parallel processing""" return Dataset([f(x) for x in self], id=self.id()) # triggers load into memory def zip(self, iter): """Returns a new dataset constructed by applying the callable on elements from zip(self,iter)""" return Dataset(zip(self,iter)) def sort(self, f): """Sort the dataset in-place using the sortkey lambda function f To perform a sort of the dataset using some property of the instance, such as the object category (e.g. for vipy.image.ImageCategory) ```python dataset.sort(lambda im: im.category()) ``` """ idx = self.index() return self.index( [idx[j] for (j,x) in sorted(zip(range(len(self)), self.tuple(f)), key=lambda x: x[1])] ) @staticmethod def uniform_shuffler(D): """A uniform shuffle on the dataset elements. Iterable access will be slow due to random access""" idx = D.index() random.shuffle(idx) return D.index(idx) @staticmethod def streaming_shuffler(D): """A uniform shuffle (approximation) on the dataset elements for iterable access only""" assert D._idx is None, "streaming only" try_import('datasets', 'datasets'); from datasets import Dataset as HuggingfaceDataset; if isinstance(D._ds, (list, tuple)): D._ds = list(D._ds) random.shuffle(D._ds) # in-place shuffle objects elif isinstance(D._ds, HuggingfaceDataset): # Special case: Arrow backed dataset D._ds = D._ds.to_iterable_dataset() # no random access D._ds.shuffle() # approximate shuffling for IterableDataset is much more efficient for __iter__ else: raise ValueError('shuffle error') return D @staticmethod def identity_shuffler(D): """Shuffler that does nothing""" return D
Subclasses
- Caltech101
- Caltech256
- CAP_classification_clip
- CAP_classification_pad
- CAP_detection
- CC12M
- CelebA
- COCO_2014
- COIL100
- D2D
- ETHZShapes
- EuroSAT
- Food101
- Food2k
- Imagenet21K
- Imagenet21K_Resized
- iNaturalist2021
- KITTI
- LFW
- MIT67
- Objectnet
- Flowers102
- PIP_175k
- PIP_370k_stabilized
- StanfordCars
- StanfordDogs
- UCF101
- VisualGenome
- WiderFace
- YoutubeBB
- Paged
- Union
Static methods
def cast(obj)
def from_directory(indir, filetype='json', id=None)
-
Recursively search indir for filetype, construct a dataset from all discovered files of that type
def from_image_urls(urls, id=None)
-
Construct a dataset from a list of image URLs
def from_json(jsonfile, id=None)
def identity_shuffler(D)
-
Shuffler that does nothing
Expand source code Browse git
@staticmethod def identity_shuffler(D): """Shuffler that does nothing""" return D
def streaming_shuffler(D)
-
A uniform shuffle (approximation) on the dataset elements for iterable access only
Expand source code Browse git
@staticmethod def streaming_shuffler(D): """A uniform shuffle (approximation) on the dataset elements for iterable access only""" assert D._idx is None, "streaming only" try_import('datasets', 'datasets'); from datasets import Dataset as HuggingfaceDataset; if isinstance(D._ds, (list, tuple)): D._ds = list(D._ds) random.shuffle(D._ds) # in-place shuffle objects elif isinstance(D._ds, HuggingfaceDataset): # Special case: Arrow backed dataset D._ds = D._ds.to_iterable_dataset() # no random access D._ds.shuffle() # approximate shuffling for IterableDataset is much more efficient for __iter__ else: raise ValueError('shuffle error') return D
def uniform_shuffler(D)
-
A uniform shuffle on the dataset elements. Iterable access will be slow due to random access
Expand source code Browse git
@staticmethod def uniform_shuffler(D): """A uniform shuffle on the dataset elements. Iterable access will be slow due to random access""" idx = D.index() random.shuffle(idx) return D.index(idx)
Methods
def all(self, mapper)
-
Expand source code Browse git
def all(self, mapper): return self.tuple(mapper=mapper, reducer=all)
def balanced(self, f)
-
Is the dataset balanced (e.g. the frequencies returned from the lambda f are all the same)?
Expand source code Browse git
def balanced(self, f): """Is the dataset balanced (e.g. the frequencies returned from the lambda f are all the same)?""" return len(set(self.frequency(f).values())) == 1
def batch(self, n)
-
Yield batches of size n as datasets. Last batch will be ragged. Batches are not loaded. Batches have appended id equal to the zero-indexed batch order
Expand source code Browse git
def batch(self, n): """Yield batches of size n as datasets. Last batch will be ragged. Batches are not loaded. Batches have appended id equal to the zero-indexed batch order""" for (k,b) in enumerate(chunkgenbysize(self, n)): yield Dataset(b).id('%s:%d' % (self.id() if self.id() else '', k))
def chunk(self, n)
-
Yield n chunks as list. Last chunk will be ragged.
Expand source code Browse git
def chunk(self, n): """Yield n chunks as list. Last chunk will be ragged.""" for (k,c) in enumerate(chunkgen(self, n)): yield list(c)
def chunks(self, sizes)
-
Partition the dataset into chunks of size given by the tuple in partitions, and give the dataset suffix if provided
Expand source code Browse git
def chunks(self, sizes): """Partition the dataset into chunks of size given by the tuple in partitions, and give the dataset suffix if provided""" assert sum(sizes) == len(self) i = 0 datasets = [] for n in sizes: datasets.append(self.clone().index(self.index()[i:i+n])) i += n return datasets
def clone(self, deep=False)
-
Return a copy of the dataset object
Expand source code Browse git
def clone(self, deep=False): """Return a copy of the dataset object""" if not deep: return copy.copy(self) else: return copy.deepcopy(self)
def count(self, f)
-
Counts for each element for which lamba returns true.
Args
f
- [lambda] if provided, count the number of elements that return true.
Returns
A length of elements that satisfy f(v) = True [if f is not None]
Expand source code Browse git
def count(self, f): """Counts for each element for which lamba returns true. Args: f: [lambda] if provided, count the number of elements that return true. Returns: A length of elements that satisfy f(v) = True [if f is not None] """ return len(self.tuple(f, reducer=lambda X: [x for x in X if x is True]))
def countby(self, f)
-
Expand source code Browse git
def countby(self, f): return self.frequency(f)
def even_split(self)
-
Split the dataset into two datasets, each half the size of the dataset. If the dataset length is odd, then one element will be dropped
Expand source code Browse git
def even_split(self): """Split the dataset into two datasets, each half the size of the dataset. If the dataset length is odd, then one element will be dropped""" return self.chunks((len(self)//2, len(self)//2, len(self)%2))[0:2]
def filter(self, f)
-
In place filter with lambda function f, keeping those elements obj in-place where f(obj) evaluates true. Callable should return bool
Expand source code Browse git
def filter(self, f): """In place filter with lambda function f, keeping those elements obj in-place where f(obj) evaluates true. Callable should return bool""" assert callable(f) return self.index( [i for (b,i) in zip(self.localmap(f), self.index()) if b] )
def frequency(self, f)
-
Frequency counts for which lambda returns the same value. For example f=lambda im: im.category() returns a dictionary of category names and counts in this category
Expand source code Browse git
def frequency(self, f): """Frequency counts for which lambda returns the same value. For example f=lambda im: im.category() returns a dictionary of category names and counts in this category""" return countby(self.tuple(mapper=f))
def groupby(self, f)
-
Group the dataset according to the callable f, returning dictionary of grouped datasets.
Expand source code Browse git
def groupby(self, f): """Group the dataset according to the callable f, returning dictionary of grouped datasets.""" assert callable(f) return {k:self.clone().index([x[1] for x in v]).id('%s:%s' % (self.id(),str(k))) for (k,v) in itertools.groupby(enumerate(self.sort(f).index()), lambda x: f(self[x[0]]))}
def id(self, new_id=None)
-
Change the dataset ID to the provided ID, or return it if None
Expand source code Browse git
def id(self, new_id=None): """Change the dataset ID to the provided ID, or return it if None""" if new_id is not None: self._id = new_id return self return self._id
def index(self, index=None, strict=False)
-
Update the index, useful for filtering of large datasets
Expand source code Browse git
def index(self, index=None, strict=False): """Update the index, useful for filtering of large datasets""" if index is not None: assert not strict or index is None or (len(index)>0 and len(index)<=len(self) and max(index)<len(self) and min(index)>=0) self._idx = index return self if self._idx is None: self._idx = list(range(len(self._ds))) # on-demand index, only if underlying dataset has known length return self._idx
def inverse_frequency(self, f)
-
Return the inverse frequency of elements grouped by the callable f. Returns a dictionary of the callable output to inverse frequency
Expand source code Browse git
def inverse_frequency(self, f): """Return the inverse frequency of elements grouped by the callable f. Returns a dictionary of the callable output to inverse frequency """ attributes = self.set(f) frequency = self.frequency(f) return {a:(1/len(attributes))*(len(self)/frequency[a]) for a in attributes} # (normalized) inverse frequency weight
def is_streaming(self)
-
Expand source code Browse git
def is_streaming(self): return self._idx is None
def len(self)
-
Expand source code Browse git
def len(self): return len(self._idx) if self._idx is not None else (len(self._ds) if hasattr(self._ds, '__len__') else None)
def list(self, mapper=None, flatten=False)
-
Return a tuple as a list, loading into memory
Expand source code Browse git
def list(self, mapper=None, flatten=False): """Return a tuple as a list, loading into memory""" return self.tuple(mapper, flatten, reducer=list)
def load(self)
-
Cache the entire dataset into memory
Expand source code Browse git
def load(self): """Cache the entire dataset into memory""" return Dataset([x for x in self], id=self.id())
def localmap(self, f)
-
A map performed without any parallel processing
Expand source code Browse git
def localmap(self, f): """A map performed without any parallel processing""" return Dataset([f(x) for x in self], id=self.id()) # triggers load into memory
def map(self, f_map, strict=True, oneway=False, ordered=False)
-
Parallel map.
To perform this in parallel across four threads:
D = vipy.dataset.Dataset(...) with vipy.globals.parallel(4): D = D.map(lambda v: ...)
Args
f_map
- [lambda] The lambda function to apply in parallel to all elements in the dataset. This must return a JSON serializable object (or set oneway=True)
strict
- [bool] If true, raise exception on distributed map failures, otherwise the map will return only those that succeeded
oneway
- [bool] If true, do not pass back results unless exception. This is useful for distributed processing
Returns
A
Dataset
containing the elements f_map(v). This operation is order preserving if ordered=True.Note
- This method uses dask distributed and
Batch
operations - Due to chunking, all error handling is caught by this method.
Use
Batch
to leverage dask distributed futures error handling. - Operations must be chunked and serialized because each dask task comes with overhead, and lots of small tasks violates best practices
- Serialized results are deserialized by the client and returned a a new dataset
Expand source code Browse git
def map(self, f_map, strict=True, oneway=False, ordered=False): """Parallel map. To perform this in parallel across four threads: ```python D = vipy.dataset.Dataset(...) with vipy.globals.parallel(4): D = D.map(lambda v: ...) ``` Args: f_map: [lambda] The lambda function to apply in parallel to all elements in the dataset. This must return a JSON serializable object (or set oneway=True) strict: [bool] If true, raise exception on distributed map failures, otherwise the map will return only those that succeeded oneway: [bool] If true, do not pass back results unless exception. This is useful for distributed processing Returns: A `vipy.dataset.Dataset` containing the elements f_map(v). This operation is order preserving if ordered=True. .. note:: - This method uses dask distributed and `vipy.batch.Batch` operations - Due to chunking, all error handling is caught by this method. Use `vipy.batch.Batch` to leverage dask distributed futures error handling. - Operations must be chunked and serialized because each dask task comes with overhead, and lots of small tasks violates best practices - Serialized results are deserialized by the client and returned a a new dataset """ assert f_map is None or callable(f_map), "invalid map function" # Identity if f_map is None: return self # Parallel map elif vipy.globals.cf() is not None: # This will fail on multiprocessing if dataset contains a loader lambda, or any element in the dataset contains a loader. Use distributed instead assert ordered == False, "not order preserving, use localmap()" return Dataset(tuple(vipy.parallel.map(f_map, self)), id=self.id()) # Distributed map elif vipy.globals.dask() is not None: from vipy.batch import Batch # requires pip install vipy[all] f_serialize = lambda x: x f_deserialize = lambda x: x f_oneway = lambda x, oneway=oneway: x if not x[0] or not oneway else (x[0], None) f_catcher = lambda f, *args, **kwargs: catcher(f, *args, **kwargs) # catch exceptions when executing lambda, return (True, result) or (False, exception) f = lambda x, f_serializer=f_serialize, f_deserializer=f_deserialize, f_map=f_map, f_catcher=f_catcher, f_oneway=f_oneway: f_serializer(f_oneway(f_catcher(f_map, f_deserializer(x)))) # with closure capture S = [f_serialize(v) for v in self] # local load, preprocess and serialize B = Batch(chunklist(S, 128), strict=False, warnme=False, minscatter=128) S = B.map(lambda X,f=f: [f(x) for x in X]).result() # distributed, chunked, with caught exceptions, may return empty list V = [f_deserialize(x) for s in S for x in s] # Local deserialization and chunk flattening # Error handling (good, bad) = ([r for (b,r) in V if b], [r for (b,r) in V if not b]) # catcher returns (True, result) or (False, exception string) if len(bad)>0: log.warning('Exceptions in distributed processing:\n%s\n\n[vipy.dataset.Dataset.map]: %d/%d items failed' % (str(bad), len(bad), len(self))) if strict: raise ValueError('exceptions in distributed processing') return Dataset(good, id=self.id()) if not oneway else None # Local map else: return self.localmap(f_map)
def minibatch(self, n, ragged=True, loader=None, bufsize=1024, accepter=None, preprocessor=None)
-
Yield preprocessed minibatches of size n of this dataset.
To yield chunks of this dataset, suitable for minibatch training/testing
D = vipy.dataset.Dataset(...) for b in D.minibatch(n): print(b)
To perform minibatch image downloading in parallel across four processes with the context manager:
D = vipy.dataset.registry('yfcc100m_url:train').take(128) with vipy.globals.parallel(4): for b in D.minibatch(16, loader=vipy.image.Transform.download, accepter=lambda im: im.is_downloaded()): print(b) # complete minibatch that passed accepter
Args
n [int]: The size of the minibatch ragged [bool]: If ragged=true, then the last chunk will be ragged with len(chunk)<n, else skipped bufsize [int]: The size of the buffer used in parallel processing of elements. Useful for parallel loading accepter [callable]: A callable that returns true|false on an element, where only elements that return true are included in the minibatch. useful for parallel loading of elements that may fail to download loader [callable]: A callable that is applied to every element of the dataset. Useful for parallel loading Returns:
Iterator overDataset
elements of length n. Minibatches will be yielded loaded and preprocessed (processing done concurrently if vipy.parallel.executor() is initialized)Note: The distributed iterator appends the minibatch index to the minibatch.id().
Note: If there exists a vipy.parallel.exeuctor(), then loading and preprocessing will be performed concurrently
Expand source code Browse git
def minibatch(self, n, ragged=True, loader=None, bufsize=1024, accepter=None, preprocessor=None): """Yield preprocessed minibatches of size n of this dataset. To yield chunks of this dataset, suitable for minibatch training/testing ```python D = vipy.dataset.Dataset(...) for b in D.minibatch(n): print(b) ``` To perform minibatch image downloading in parallel across four processes with the context manager: ```python D = vipy.dataset.registry('yfcc100m_url:train').take(128) with vipy.globals.parallel(4): for b in D.minibatch(16, loader=vipy.image.Transform.download, accepter=lambda im: im.is_downloaded()): print(b) # complete minibatch that passed accepter ``` Args: n [int]: The size of the minibatch ragged [bool]: If ragged=true, then the last chunk will be ragged with len(chunk)<n, else skipped bufsize [int]: The size of the buffer used in parallel processing of elements. Useful for parallel loading accepter [callable]: A callable that returns true|false on an element, where only elements that return true are included in the minibatch. useful for parallel loading of elements that may fail to download loader [callable]: A callable that is applied to every element of the dataset. Useful for parallel loading Returns: Iterator over `vipy.dataset.Dataset` elements of length n. Minibatches will be yielded loaded and preprocessed (processing done concurrently if vipy.parallel.executor() is initialized) ..note:: The distributed iterator appends the minibatch index to the minibatch.id(). ..note:: If there exists a vipy.parallel.exeuctor(), then loading and preprocessing will be performed concurrently """ for (k,b) in enumerate(chunkgenbysize(vipy.parallel.iter(self, mapper=loader, bufsize=max(bufsize,n), accepter=accepter), n)): if ragged or len(b) == n: yield Dataset.cast(b).id('%s:%d' % (self.id() if self.id() else '', k))
def partition(self, trainfraction=0.9, valfraction=0.1, testfraction=0, trainsuffix=':train', valsuffix=':val', testsuffix=':test')
-
Partition the dataset into the requested (train,val,test) fractions.
Args
- trainfraction [float]: fraction of dataset for training set
- valfraction [float]: fraction of dataset for validation set
- testfraction [float]: fraction of dataset for test set
trainsuffix
- If not None, append this string the to trainset ID
valsuffix
- If not None, append this string the to valset ID
testsuffix
- If not None, append this string the to testset ID
Returns:
(trainset, valset, testset) such that trainset is the first trainfraction of the dataset.Note: This does not permute the dataset. To randomize split, shuffle dataset first
Expand source code Browse git
def partition(self, trainfraction=0.9, valfraction=0.1, testfraction=0, trainsuffix=':train', valsuffix=':val', testsuffix=':test'): """Partition the dataset into the requested (train,val,test) fractions. Args: trainfraction [float]: fraction of dataset for training set valfraction [float]: fraction of dataset for validation set testfraction [float]: fraction of dataset for test set trainsuffix: If not None, append this string the to trainset ID valsuffix: If not None, append this string the to valset ID testsuffix: If not None, append this string the to testset ID Returns: (trainset, valset, testset) such that trainset is the first trainfraction of the dataset. .. note:: This does not permute the dataset. To randomize split, shuffle dataset first """ assert trainfraction >=0 and trainfraction <= 1, "invalid training set fraction '%f'" % trainfraction assert valfraction >=0 and valfraction <= 1, "invalid validation set fraction '%f'" % valfraction assert testfraction >=0 and testfraction <= 1, "invalid test set fraction '%f'" % testfraction assert abs(trainfraction + valfraction + testfraction - 1) < 1E-6, "fractions must sum to one" idx = self.index() (testidx, validx, trainidx) = dividelist(idx, (testfraction, valfraction, trainfraction)) trainset = self.clone().index(trainidx) if trainsuffix and trainset.id(): trainset.id(trainset.id() + trainsuffix) valset = self.clone().index(validx) if valsuffix and valset.id(): valset.id(valset.id() + valsuffix) testset = self.clone().index(testidx) if testsuffix and testset.id(): testset.id(testset.id() + testsuffix) return (trainset,valset,testset) if testfraction!=0 else (trainset, valset)
def pipeline(self, n, m, ragged=True, prepad=True, postpad=True)
-
Yield pipelined minibatches of size n with pipeline length m.
A pipelined minibatch is a tuple (head, tail) such that (head, tail) are minibatches at different indexes in the dataset.
Head corresponds to the current minibatch and tail corresponds to the minibatch left shifted by (m-1) minibatches.This structure is useful for yielding datasets for pipelined training where head contains the minibatch that will complete pipeline training on this iteration, and tail contains the next minibatch to be inserted into the pipeline on this iteration.
```python D = vipy.dataset.Dataset(…) for (head, tail) in D.pipeline(n, m, prepad=False, postpad=False): assert head == D[0:m] assert tail == D[n(m-1): n(m-1)+n]
Args
- n [int]: The size of each minibatch
- m [int]: The pipeline length in minibatches
- ragged [bool]: If ragged=true, then the last chunk will be ragged with len(chunk)<n, else skipped
prepad
- If true, yield (head, tail) == (None, batch) when filling the pipeline
postpad
- If true, yield (head, tail) == (batch, None) when flushing the pipeline
Returns:
Iterator over tuples (head,tail) ofDataset
elements of length n where tail is left shifted by n*(m-1) elements.Note: The distributed iterator is not order preserving over minibatches and yields minibatches as completed, however the tuple (head, tail) is order preserving within the pipeline
Note: If there exists a vipy.parallel.executor(), then loading and preprocessing will be performed concurrently
Expand source code Browse git
def pipeline(self, n, m, ragged=True, prepad=True, postpad=True): """Yield pipelined minibatches of size n with pipeline length m. A pipelined minibatch is a tuple (head, tail) such that (head, tail) are minibatches at different indexes in the dataset. Head corresponds to the current minibatch and tail corresponds to the minibatch left shifted by (m-1) minibatches. This structure is useful for yielding datasets for pipelined training where head contains the minibatch that will complete pipeline training on this iteration, and tail contains the next minibatch to be inserted into the pipeline on this iteration. ```python D = vipy.dataset.Dataset(...) for (head, tail) in D.pipeline(n, m, prepad=False, postpad=False): assert head == D[0:m] assert tail == D[n*(m-1): n*(m-1)+n] Args: n [int]: The size of each minibatch m [int]: The pipeline length in minibatches ragged [bool]: If ragged=true, then the last chunk will be ragged with len(chunk)<n, else skipped prepad: If true, yield (head, tail) == (None, batch) when filling the pipeline postpad: If true, yield (head, tail) == (batch, None) when flushing the pipeline Returns: Iterator over tuples (head,tail) of `vipy.dataset.Dataset` elements of length n where tail is left shifted by n*(m-1) elements. .. note:: The distributed iterator is not order preserving over minibatches and yields minibatches as completed, however the tuple (head, tail) is order preserving within the pipeline .. note:: If there exists a vipy.parallel.executor(), then loading and preprocessing will be performed concurrently """ pipeline = [] for (k,b) in enumerate(self.minibatch(n, ragged=ragged)): # not order preserving pipeline.append(b) # order preserving within pipeline if k < m-1: if prepad: yield( (None, b) ) else: yield( (pipeline.pop(0), b) ) # yield deque-like (minibatch, shifted minibatch) tuples for p in pipeline: if postpad: yield( (p, None) )
def raw(self)
-
Return a view of this dataset without the loader
Expand source code Browse git
def raw(self): """Return a view of this dataset without the loader""" return Dataset(self._ds, loader=None)
def repeat(self, n)
-
Repeat the dataset n times. If n=0, the dataset is unchanged, if n=1 the dataset is doubled in length, etc.
Expand source code Browse git
def repeat(self, n): """Repeat the dataset n times. If n=0, the dataset is unchanged, if n=1 the dataset is doubled in length, etc.""" assert n>=0 return self.index( self.index()*(n+1) )
def sample(self)
-
Return a single element sampled uniformly at random
Expand source code Browse git
def sample(self): """Return a single element sampled uniformly at random""" return self.takeone()
def set(self, mapper=None, flatten=False)
-
Return the dataset as a set. Mapper must be a lambda function that returns a hashable type
Expand source code Browse git
def set(self, mapper=None, flatten=False): """Return the dataset as a set. Mapper must be a lambda function that returns a hashable type""" return self.tuple(mapper=mapper, reducer=set, flatten=flatten)
def shift(self, m)
-
Circular shift the dataset m elements to the left, so that self[k+m] == self.shift(m)[k]. Circular shift for boundary handling so that self.shift(m)[-1] == self[m-1]
Expand source code Browse git
def shift(self, m): """Circular shift the dataset m elements to the left, so that self[k+m] == self.shift(m)[k]. Circular shift for boundary handling so that self.shift(m)[-1] == self[m-1]""" return self.clone().index(self.index()[m:] + self.index()[0:m])
def shuffle(self, shuffler=None)
-
Permute elements in this dataset uniformly at random in place using the optimal shuffling strategy for the dataset structure to maximize performance. This method will use either Dataset.streaming_shuffler (for iterable datasets) or Dataset.uniform_shuffler (for random access datasets)
Expand source code Browse git
def shuffle(self, shuffler=None): """Permute elements in this dataset uniformly at random in place using the optimal shuffling strategy for the dataset structure to maximize performance. This method will use either Dataset.streaming_shuffler (for iterable datasets) or Dataset.uniform_shuffler (for random access datasets) """ assert shuffler is None or callable(shuffler) shuffler = shuffler if shuffler is not None else (Dataset.streaming_shuffler if self.is_streaming() else Dataset.uniform_shuffler) return shuffler(self)
def slice(self, start=0, stop=-1, step=1)
-
Slice the dataset to contain elements defined by slice(start, stop, step)
Expand source code Browse git
def slice(self, start=0, stop=-1, step=1): """Slice the dataset to contain elements defined by slice(start, stop, step)""" return self.clone().index(self.index()[start:stop:step])
def sort(self, f)
-
Sort the dataset in-place using the sortkey lambda function f
To perform a sort of the dataset using some property of the instance, such as the object category (e.g. for vipy.image.ImageCategory)
dataset.sort(lambda im: im.category())
Expand source code Browse git
def sort(self, f): """Sort the dataset in-place using the sortkey lambda function f To perform a sort of the dataset using some property of the instance, such as the object category (e.g. for vipy.image.ImageCategory) ```python dataset.sort(lambda im: im.category()) ``` """ idx = self.index() return self.index( [idx[j] for (j,x) in sorted(zip(range(len(self)), self.tuple(f)), key=lambda x: x[1])] )
def split(self, size)
-
Split the dataset into two datasets, one of length size, the other of length len(self)-size
Expand source code Browse git
def split(self, size): """Split the dataset into two datasets, one of length size, the other of length len(self)-size""" assert isinstance(size, int) and size>=0 and size<len(self) return self.partition(size/len(self), (len(self)-size)/len(self), 0, '', '', '')
def streaming_map(self, mapper, accepter=None, bufsize=1024)
-
Returns a generator that will apply the mapper and yield only those elements that return True from the accepter. Performs the map in parallel if used in the vipy.globals.parallel context manager
Expand source code Browse git
def streaming_map(self, mapper, accepter=None, bufsize=1024): """Returns a generator that will apply the mapper and yield only those elements that return True from the accepter. Performs the map in parallel if used in the vipy.globals.parallel context manager""" return vipy.parallel.iter(self, mapper=mapper, accepter=accepter, bufsize=bufsize)
def take(self, n, inplace=False)
-
Randomly Take n elements from the dataset, and return a dataset (in-place or cloned). If n is greater than the size of the dataset, sample with replacement, if n is less than the size of the dataset, sample without replacement
Expand source code Browse git
def take(self, n, inplace=False): """Randomly Take n elements from the dataset, and return a dataset (in-place or cloned). If n is greater than the size of the dataset, sample with replacement, if n is less than the size of the dataset, sample without replacement""" assert isinstance(n, int) and n>0 D = self.clone() if not inplace else self return D.index(list((random.sample if n<= len(self) else random.choices)(D.index(), k=n)) )
def take_fraction(self, p, inplace=False)
-
Randomly take a percentage of the dataset, returning a clone or in-place
Expand source code Browse git
def take_fraction(self, p, inplace=False): """Randomly take a percentage of the dataset, returning a clone or in-place""" assert p>=0 and p<=1, "invalid fraction '%s'" % p return self.take(n=int(len(self)*p), inplace=inplace)
def takeby(self, f, n)
-
Filter the dataset according to the callable f, take n from each group and return a dataset. Callable should return bool. If n==1, return a singleton
Expand source code Browse git
def takeby(self, f, n): """Filter the dataset according to the callable f, take n from each group and return a dataset. Callable should return bool. If n==1, return a singleton""" d = self.clone().filter(f) return d.take(n) if n>1 else d.takeone()
def takelist(self, n)
-
Take n elements and return list. The elements are loaded and not cloned.
Expand source code Browse git
def takelist(self, n): """Take n elements and return list. The elements are loaded and not cloned.""" return self.take(n).list()
def takeone(self)
-
Randomly take one element from the dataset and return a singleton
Expand source code Browse git
def takeone(self): """Randomly take one element from the dataset and return a singleton""" return self[random.randint(0, len(self)-1)]
def truncate(self, m)
-
Truncate the dataset to contain the first m elements only
Expand source code Browse git
def truncate(self, m): """Truncate the dataset to contain the first m elements only""" return self.slice(stop=m)
def tuple(self, mapper=None, flatten=False, reducer=None)
-
Return the dataset as a tuple, applying the optional mapper lambda on each element, applying optional flattener on sequences returned by mapper, and applying the optional reducer lambda on the final tuple, return a generator
Expand source code Browse git
def tuple(self, mapper=None, flatten=False, reducer=None): """Return the dataset as a tuple, applying the optional mapper lambda on each element, applying optional flattener on sequences returned by mapper, and applying the optional reducer lambda on the final tuple, return a generator""" assert mapper is None or callable(mapper) assert reducer is None or callable(reducer) mapped = self.map(mapper) if mapper else self flattened = (y for x in mapped for y in x) if flatten else (x for x in mapped) reduced = reducer(flattened) if reducer else flattened return reduced
def zip(self, iter)
-
Returns a new dataset constructed by applying the callable on elements from zip(self,iter)
Expand source code Browse git
def zip(self, iter): """Returns a new dataset constructed by applying the callable on elements from zip(self,iter)""" return Dataset(zip(self,iter))
class Paged (pagelist, loader, id=None, strict=True, index=None, cachesize=32)
-
Paged dataset.
A paged dataset is a dataset of length N=M*P constructed from M archive files (the pages) each containing P elements (the pagesize).
The paged dataset must be constructed with tuples of (pagesize, filename).
The loader will fetch, load and cache the pages on demand using the loader, preserving the most recently used cachesize pagesD = vipy.dataset.Paged([(64, 'archive1.pkl'), (64, 'archive2.pkl')], lambda x,y: ivy.load(y))
.. note :: Shuffling this dataset is biased. Shuffling will be performed to mix the indexes, but not uniformly at random. The goal is to preserve data locality to minimize cache misses.
Expand source code Browse git
class Paged(Dataset): """ Paged dataset. A paged dataset is a dataset of length N=M*P constructed from M archive files (the pages) each containing P elements (the pagesize). The paged dataset must be constructed with tuples of (pagesize, filename). The loader will fetch, load and cache the pages on demand using the loader, preserving the most recently used cachesize pages ```python D = vipy.dataset.Paged([(64, 'archive1.pkl'), (64, 'archive2.pkl')], lambda x,y: ivy.load(y)) ``` .. note :: Shuffling this dataset is biased. Shuffling will be performed to mix the indexes, but not uniformly at random. The goal is to preserve data locality to minimize cache misses. """ def __init__(self, pagelist, loader, id=None, strict=True, index=None, cachesize=32): super().__init__(dataset=pagelist, id=id, loader=loader).index(index if index else list(range(sum([p[0] for p in pagelist])))) assert callable(loader), "page loader required" assert not strict or len(set([x[0] for x in self._ds])) == 1 # pagesizes all the same self._cachesize = cachesize self._pagecache = {} self._ds = list(self._ds) self._pagesize = self._ds[0][0] # (pagesize, pklfile) tuples def shuffle(self, shuffler=None): """Permute elements while preserve page locality to minimize cache misses""" shuffler = shuffler if shuffler is not None else functools.partial(Paged.chunk_shuffler, chunksize=int(1.5*self._pagesize)) return shuffler(self) def __getitem__(self, k): if isinstance(k, (int, np.uint64)): assert abs(k) < len(self._idx), "invalid index" page = self._idx[int(k)] // self._pagesize if page not in self._pagecache: self._pagecache[page] = self._loader(*self._ds[page]) # load and cache new page if len(self._pagecache) > self._cachesize: self._pagecache.pop(list(self._pagecache.keys())[0]) # remove oldest x = self._pagecache[page][int(k) % self._pagesize] return x elif isinstance(k, slice): return [self[i] for i in range(len(self))[k.start if k.start else 0:k.stop if k.stop else len(self):k.step if k.step else 1]] # expensive else: raise ValueError('invalid index type "%s"' % type(k)) def flush(self): self._pagecache = {} return self @staticmethod def chunk_shuffler(D, chunker, chunksize=64): """Split dataset into len(D)/chunksize non-overlapping chunks with some common property returned by chunker, shuffle chunk order and shuffle within chunks. - If chunksize=1 then this is equivalent to uniform_shuffler - chunker must be a callable of some property that is used to group into chunks """ assert callable(chunker) return D.randomize().sort(chunker).index([i for I in shufflelist([shufflelist(I) for I in chunkgenbysize(D.index(), chunksize)]) for i in I])
Ancestors
Static methods
def chunk_shuffler(D, chunker, chunksize=64)
-
Split dataset into len(D)/chunksize non-overlapping chunks with some common property returned by chunker, shuffle chunk order and shuffle within chunks.
- If chunksize=1 then this is equivalent to uniform_shuffler
- chunker must be a callable of some property that is used to group into chunks
Expand source code Browse git
@staticmethod def chunk_shuffler(D, chunker, chunksize=64): """Split dataset into len(D)/chunksize non-overlapping chunks with some common property returned by chunker, shuffle chunk order and shuffle within chunks. - If chunksize=1 then this is equivalent to uniform_shuffler - chunker must be a callable of some property that is used to group into chunks """ assert callable(chunker) return D.randomize().sort(chunker).index([i for I in shufflelist([shufflelist(I) for I in chunkgenbysize(D.index(), chunksize)]) for i in I])
Methods
def flush(self)
-
Expand source code Browse git
def flush(self): self._pagecache = {} return self
def shuffle(self, shuffler=None)
-
Permute elements while preserve page locality to minimize cache misses
Expand source code Browse git
def shuffle(self, shuffler=None): """Permute elements while preserve page locality to minimize cache misses""" shuffler = shuffler if shuffler is not None else functools.partial(Paged.chunk_shuffler, chunksize=int(1.5*self._pagesize)) return shuffler(self)
Inherited members
Dataset
:balanced
batch
chunk
chunks
clone
count
even_split
filter
frequency
from_directory
from_image_urls
groupby
id
identity_shuffler
index
inverse_frequency
list
load
localmap
map
minibatch
partition
pipeline
raw
repeat
sample
set
shift
slice
sort
split
streaming_map
streaming_shuffler
take
take_fraction
takeby
takelist
takeone
truncate
tuple
uniform_shuffler
zip
class Union (*args, **kwargs)
-
vipy.dataset.Union() class
Common class to manipulate groups of vipy.dataset.Dataset objects in parallel
Usage
>>> cifar10 = vipy.dataset.registry('cifar10') >>> mnist = vipy.dataset.registry('mnist') >>> dataset = vipy.dataset.Union(mnist, cifar10) >>> dataset = mnist | cifar10
Args
Datasets
Expand source code Browse git
class Union(Dataset): """vipy.dataset.Union() class Common class to manipulate groups of vipy.dataset.Dataset objects in parallel Usage: >>> cifar10 = vipy.dataset.registry('cifar10') >>> mnist = vipy.dataset.registry('mnist') >>> dataset = vipy.dataset.Union(mnist, cifar10) >>> dataset = mnist | cifar10 Args: Datasets """ __slots__ = ('_id', '_ds', '_idx', '_loader', '_type') def __init__(self, *args, **kwargs): assert all(isinstance(d, (Dataset, )) for d in args), "invalid datasets" datasets = [d for d in args] # order preserving assert all([isinstance(d, Dataset) for d in datasets]), "Invalid datasets '%s'" % str([type(d) for d in datasets]) datasets = [j for i in datasets for j in (i.datasets() if isinstance(i, Union) else (i,))] # flatten unions self._ds = datasets self._idx = None self._id = kwargs['id'] if 'id' in kwargs else None self._loader = None # individual datasets have loaders self._type = None def is_streaming(self): return self._idx is None and all(d.is_streaming() for d in self.datasets()) def __len__(self): return sum(d.__len__() for d in self.datasets()) if self._idx is None else len(self._idx) def __iter__(self): if self.is_streaming(): k = -1 iter = [d.__iter__() for d in self.datasets()] # round-robin for m in range(len(self.datasets())): try: while True: k = (k + 1) % len(iter) yield next(iter[k]) # assumes ordered except StopIteration: iter.pop(k) k -= 1 else: self.index() # force random access for (i,j) in self._idx: yield self._ds[i][j] # random access (slower) def __getitem__(self, k): self.index() # force random access if isinstance(k, (int, np.uint64)): assert abs(k) < len(self._idx), "invalid index" (i,j) = self._idx[int(k)] return self._ds[i][j] elif isinstance(k, slice): return [self._ds[i][j] for (i,j) in self._idx[k.start:k.stop:k.step]] else: raise ValueError('invalid index type "%s"' % type(k)) def __repr__(self): fields = ['id=%s' % truncate_string(self.id(), maxlen=64)] if self.id() else [] fields += ['len=%d' % len(self)] fields += ['union=%s' % str(tuple([truncate_string(d.id(), maxlen=80) for d in self._ds]))] return str('<vipy.dataset.Dataset: %s>' % (', '.join(fields))) def index(self, index=None, strict=False): """Update the index, useful for filtering of large datasets""" if index is not None: self._idx = index return self if self._idx is None: # Index on-demand: zipped (dataset index, element index) tuples, in round-robin dataset order [(0,0),(1,0),...,(0,n),(1,n),...] lengths = [len(d) for d in self.datasets()] self._idx = [c for r in [[(i,j) for i in range(len(self.datasets()))] for j in range(max(lengths))] for c in r if c[1]<lengths[c[0]]] return self._idx def clone(self, deep=False): """Return a copy of the dataset object""" D = super().clone(deep=deep) D._ds = [d.clone(deep=deep) for d in D._ds] return D def datasets(self): """Return the dataset union elements, useful for generating unions of unions""" return list(self._ds) def shuffle(self, shuffler=None): """Permute elements in this dataset uniformly at random in place using the best shuffler for the dataset structure""" shuffler = shuffler if shuffler is not None else (Union.streaming_shuffler if self.is_streaming() else Dataset.uniform_shuffler) return shuffler(self) @staticmethod def streaming_shuffler(D): """A uniform shuffle (approximation) on the dataset elements for iterable access only""" assert D._idx is None, "iterable dataset only" D._ds = [Dataset.streaming_shuffler(d) for d in D._ds] # shuffle dataset shards random.shuffle(D._ds) # shuffle union order return D
Ancestors
Methods
def datasets(self)
-
Return the dataset union elements, useful for generating unions of unions
Expand source code Browse git
def datasets(self): """Return the dataset union elements, useful for generating unions of unions""" return list(self._ds)
def is_streaming(self)
-
Expand source code Browse git
def is_streaming(self): return self._idx is None and all(d.is_streaming() for d in self.datasets())
def shuffle(self, shuffler=None)
-
Permute elements in this dataset uniformly at random in place using the best shuffler for the dataset structure
Expand source code Browse git
def shuffle(self, shuffler=None): """Permute elements in this dataset uniformly at random in place using the best shuffler for the dataset structure""" shuffler = shuffler if shuffler is not None else (Union.streaming_shuffler if self.is_streaming() else Dataset.uniform_shuffler) return shuffler(self)
Inherited members
Dataset
:balanced
batch
chunk
chunks
clone
count
even_split
filter
frequency
from_directory
from_image_urls
groupby
id
identity_shuffler
index
inverse_frequency
list
load
localmap
map
minibatch
partition
pipeline
raw
repeat
sample
set
shift
slice
sort
split
streaming_map
streaming_shuffler
take
take_fraction
takeby
takelist
takeone
truncate
tuple
uniform_shuffler
zip