datastreams¶
Efficient, concise stream data processing.
>>> from datastreams import DataStream
>>> DataStream("Hello, gorgeous.")\
... .filter(lambda char: char.isalpha())\
... .map_method('lower')\
... .count_frequency().to_list()
[('e', 2), ('g', 2), ('h', 1), ('l', 2), ('o', 3), ('s', 1), ('r', 1), ('u', 1)]
Overview¶
Pipelining¶
DataStreams are perfect for pipelined feature calculation:
def calc_name(user):
user.first_name = user.name.split(' ')[0] if user.name else ''
return user
def calc_age(user):
user.age = datetime.now() - user.birthday
return user
DataStream(users)\
.map(calc_name)\
.map(calc_age)\
.for_each(User.save)\
.execute() # <- Nothing happens till here
Theses calculations are efficient because streams are generators by default, and no memory is wasted on unnecessary intermediary collections.
This is even better when certain calculation steps are long, and must be wrapped in their own functions. Of course, for brevity, you can use set:
DataStream(users)\
.set('first_name', lambda user: user.name.split(' ')[0] if user.name else '')\
.set('age', lambda user: datetime.now() - user.birthday)\
.for_each(User.save)\
.execute()
Joins¶
You can join DataStream
- even streams of objects!
userstream = DataStream(users)
transactstream = DataStream(transactions)
user_spend = userstream.join('inner', 'user_id', transactstream)\
.group_by('user_id')\
.map(lambda usertrans: (usertrans[0], sum(tran.price for tran in usertrans[1])))\
.to_dict()
# {'47328129': 240.95, '48190234': 40.73, ...}
where
Clause¶
Chained filter
s are a bit tiresome. where
lets you perform simple filtering using more accessible language:
DataStream(users)\
.where('age').gteq(18)\
.where('age').lt(35)\
.where('segment').is_in(target_segments)\
.for_each(do_something).execute()
Instead of:
DataStream(users)\
.filter(lambda user: user.age >= 18)\
.filter(lambda user: user.age < 35)\
.filter(lambda user: user.segment in target_segments)\
.for_each(do_something).execute()
I bet you got tired just reading that many lambdas!
Apache Spark Integration¶
Integrating with Apache Spark is easy - just use RddStream
instead of DataStream
or DataSet
, and pass it an RDD. The rest of the API is the same!
RddStream(myrdd)\
.where('age').gteq(18)\
.where('age').lt(35)\
.where('segment').is_in(target_segments)\
.for_each(do_something).execute()
DataStream API¶
-
class
datastreams.
DataStream
(source, transform=<function <lambda>>, predicate=<function <lambda>>)[source]¶ Foundation for the package -
DataStream
allows you to chain map/filter/reduce/etc style operations together:>>> stream = DataStream(range(10)) >>> stream.filter(lambda n: n % 2 == 0).map(lambda n: n*5).to_list() ... [0, 10, 20, 30, 40]
DataStreams are evaluated lazily (using generators), providing memory efficiency and speed. Using
collect()
produces aDataSet
, which evalutes the whole stream and caches the result.-
batch
(batch_size)[source]¶ Batches rows of a stream in a given chunk size
>>> DataStream(range(10)).batch(2).to_list() ... [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
Parameters: batch_size (int) – size of each batch Return type: DataStream
-
chain
()[source]¶ Chains together iterables, flattening them
>>> DataStream(['this', 2, None]).map(dir).chain().to_list() ... ['__add__', '__class__', '__contains__', '__delattr__', ...]
Return type: DataStream
-
collect
()[source]¶ Collects the stream into a
DataSet
>>> DataStream(range(5)).map(lambda n: n * 5).collect() ... DataSet([0, 5, 10, 15, 20])
Return type: DataSet
-
collect_as
(constructor)[source]¶ Collects using a constructor
>>> DataStream(range(5)).collect_as(str) ... DataSet(['0', '1', '2', '3', '4'])
Parameters: constructor – class or constructor function Return type: DataSet
-
concat
()[source]¶ Alias for
chain()
>>> DataStream(['this', 2, None]).map(dir).concat().to_list() ... ['__add__', '__class__', '__contains__', '__delattr__', ...]
Return type: DataStream
-
concat_map
(function)[source]¶ map()
a function over the stream, then concat it>>> DataStream(['this', 2, None]).concat_map(dir).to_list() ... ['__add__', '__class__', '__contains__', '__delattr__', ...]
Parameters: function (function) – function to apply Return type: DataStream
-
count
()[source]¶ Counts the number of rows in this stream. This will exhaust a stream!
>>> DataStream(range(5)).count() ... 5
Return type: int
-
count_frequency
()[source]¶ Counts frequency of each row in the stream
>>> DataStream(['a', 'a', 'b', 'c']).count_frequency() ... DataSet([('a', 2), ('b', 1), ('c', 1)])
Return type: DataSet
-
dedupe
(key_fn=<function <lambda>>)[source]¶ Removes duplicates from a stream, returning only unique values.
>>> DataStream('aaaabcccddd').dedupe().to_list() ... ['a', 'b', 'c', 'd']
Parameters: key_fn (function) – function returning a hashable value used to determine uniqueness Returns: DataStream
-
drop
(n)[source]¶ Drops n rows from the stream
>>> DataStream(range(10)).drop(5).to_list() ... [5, 6, 7, 8, 9]
Parameters: n (int) – number of rows to be dropped Return type: DataStream
-
execute
()[source]¶ Evaluates the stream (nothing happens until a stream is evaluted)
>>> from pprint import pprint >>> DataStream(range(3)).for_each(pprint).execute() ... 0 ... 1 ... 2 ... <datastreams.DataStream at 0x7f6995ea4790>
-
filter
(filter_fn)[source]¶ Filters a stream using the passed in predicate function.
>>> DataStream(range(10)).filter(lambda n: n % 2 == 0).to_list() ... [0, 2, 4, 6, 8]
Parameters: filter_fn (function) – only passes values for which filter_fn returns True
Return type: DataStream
-
filter_method
(method, *args, **kwargs)[source]¶ Filters using a method of the stream row using passed in args/kwargs
>>> DataStream(['hi', 'h1', 'ho']).filter_method('isalpha').to_list() ... ['hi', 'ho']
Parameters: method (str) – name of method to be called Return type: DataStream
-
filters
(filter_fns)[source]¶ Apply a list of filter functions
>>> evens_less_than_six = [lambda n: n < 6, lambda n: n % 2 == 0] >>> DataStream(range(10)).filters(evens_less_than_six).to_list() ... [0, 2, 4]
Parameters: filter_fns (list[function]) – list of filter functions Return type: DataStream
-
for_each
(function)[source]¶ Calls a function for each row in the stream, but passes the row value through
>>> from pprint import pprint >>> DataStream(range(3)).for_each(pprint).execute() ... 0 ... 1 ... 2 ... <datastreams.DataStream at 0x7f6995ea4790>
Parameters: function (function) – function to call on each row Return type: DataStream
-
classmethod
from_csv
(path, headers=None, constructor=<class 'datastreams.datastreams.Datum'>)[source]¶ Stream rows from a csv file
>>> DataStream.from_csv('payments.csv').to_list() ... [Datum({'name': 'joe', 'charge': 174.93}), Datum({'name': 'sally', 'charge': 198.05}), ...]
Parameters: - path (str) – path to csv to be streamed
- headers (list[str]) – manual names for headers - if present, first row is pulled in as data, if
None
, first row is used as headers - constructor – class or function to construct for each row
Return type:
-
classmethod
from_file
(path)[source]¶ Stream lines from a file
>>> DataStream.from_file('hamlet.txt').concat_map(str.split).take(7) ... ['The', 'Tragedy', 'of', 'Hamlet,', 'Prince', 'of', 'Denmark']
Parameters: path (str) – path to file to be streamed Return type: DataStream
-
classmethod
from_stdin
()[source]¶ Stream rows from stdin
Return type: DataStream
-
get
(name, default=None)[source]¶ Gets the named attribute of each row in the stream
>>> Person = namedtuple('Person', ['name', 'year_born']) >>> DataStream([Person('amy', 1987), Person('brad', 1980)]).get('year_born').to_list() ... [1987, 1980]
Parameters: - attr (str) – attribute name
- default – default value to use if attr name not found in row
Return type:
-
group_by
(key)[source]¶ Groups a stream by key, returning a
DataSet
of(K, tuple(V))
>>> stream = DataStream(range(3) * 3) >>> stream.group_by('real').to_dict() ... {0: (0, 0, 0), 1: (1, 1, 1), 2: (2, 2, 2)}
Parameters: key (str) – attribute name to group by Return type: DataSet
-
group_by_fn
(key_fn)[source]¶ Groups a stream by function, returning a
DataSet
of(K, tuple(V))
>>> stream = DataStream(['hi', 'hey', 'yo', 'sup']) >>> stream.group_by_fn(lambda w: len(w)).to_dict() ... {2: ('hi', 'yo'), 3: ('hey', 'sup')}
Parameters: key_fn (function) – key function returning hashable value to group by Return type: DataSet
-
inner_join
(key, right)[source]¶ Returns a dataset joined using keys in both dataset only
Parameters: - right (DataStream) –
DataStream
to be joined with - key (str) – attribute name to join on
Return type: - right (DataStream) –
-
inner_join_by
(left_key_fn, right_key_fn, right)[source]¶ Returns a dataset joined using key functions to evaluate equality
Parameters: - left_key_fn (function) – key function that produces a hashable value from left stream
- right_key_fn (function) – key function that produces a hashable value from right stream
- right (DataStream) –
DataStream
to be joined with
Return type:
-
join
(how, key, right)[source]¶ Returns a dataset joined using keys from right dataset only
Parameters: - how (str) –
left
,right
,outer
, orinner
- right (DataStream) –
DataStream
to be joined with - key (str) – attribute name to join on
Return type: - how (str) –
-
join_by
(how, left_key_fn, right_key_fn, right)[source]¶ Uses two key functions perform a join. Key functions should produce hashable types to be used to compare/index dicts.
Parameters: - how (str) –
left
,right
,outer
, orinner
- right (DataStream) –
DataStream
to be joined with - left_key_fn (function) – key function that produces a hashable value from left stream
- right_key_fn (function) – key function that produces a hashable value from right stream
Return type: - how (str) –
-
left_join
(key, right)[source]¶ Returns a dataset joined using keys from right dataset only
Parameters: - right (DataStream) –
DataStream
to be joined with - key (str) – attribute name to join on
Return type: - right (DataStream) –
-
left_join_by
(left_key_fn, right_key_fn, right)[source]¶ Returns a dataset joined using key functions to evaluate equality
Parameters: - how (str) –
left
,right
,outer
, orinner
- left_key_fn (function) – key function that produces a hashable value from left stream
- right_key_fn (function) – key function that produces a hashable value from right stream
- right (DataStream) –
DataStream
to be joined with
Return type: - how (str) –
-
map
(function)[source]¶ Apply a function to each row in this stream
>>> DataStream(range(5)).map(lambda n: n * 5).to_list() ... [0, 5, 10, 15, 20]
Parameters: function (function) – function to apply Return type: DataStream
-
map_method
(method, *args, **kwargs)[source]¶ Call named method of each row using supplied args/kwargs
>>> DataStream(['hi', 'hey', 'yo']).map_method('upper').to_list() ... ['HI', 'HEY', 'YO']
Parameters: method (str) – name of method to be called Return type: DataStream
-
outer_join
(key, right)[source]¶ Returns a dataset joined using keys in either datasets
Parameters: - right (DataStream) –
DataStream
to be joined with - key (str) – attribute name to join on
Return type: - right (DataStream) –
-
outer_join_by
(left_key_fn, right_key_fn, right)[source]¶ Returns a dataset joined using key functions to evaluate equality
Parameters: - left_key_fn (function) – key function that produces a hashable value from left stream
- right_key_fn (function) – key function that produces a hashable value from right stream
- right (DataStream) –
DataStream
to be joined with
Return type:
-
pick_attrs
(attr_names)[source]¶ Picks attributes from each row in a stream. This is helpful for limiting row attrs to only those you want to save in a database, etc.
>>> Person = namedtuple('Person', ['name', 'year_born']) >>> DataStream([Person('amy', 1987), Person('brad', 1980)]).pick_attrs(['year_born']).to_list() ... [Datum({'year_born': 1987}), Datum({'year_born': 1980})]
Parameters: attr_names (list[str]) – list of attribute names to keep Return type: DataStream
-
reduce
(function, initial=None)[source]¶ Applying a reducing function to rows in a stream
Parameters: - function (function) – reducing function, with parameters
last_iteration
,next_value
- initial – initial value for reduce, if None, takes the first element of this stream as initial
- function (function) – reducing function, with parameters
-
reduce_to_dataset
(function, initial=None)[source]¶ Applies a reducer over this stream, returning a DataSet of the results
Parameters: - function – reducing function, with parameters
last_iteration
,next_value
- initial – initial value for reduce, if None, takes the first element of this stream as initial
Return type: - function – reducing function, with parameters
-
right_join
(key, right)[source]¶ Returns a dataset joined using keys in right dataset only
Parameters: - right (DataStream) –
DataStream
to be joined with - key (str) – attribute name to join on
Return type: - right (DataStream) –
-
right_join_by
(left_key_fn, right_key_fn, right)[source]¶ Returns a dataset joined using key functions to evaluate equality
Parameters: - left_key_fn (function) – key function that produces a hashable value from left stream
- right_key_fn (function) – key function that produces a hashable value from right stream
- right (DataStream) –
DataStream
to be joined with
Return type:
-
sample
(probability, n)[source]¶ Sample N rows with a given probability of choosing a given row
>>> DataStream(range(100)).sample(0.1, 5) ...
Parameters: Return type:
-
set
(name, transfer_func=None, value=None)[source]¶ Sets the named attribute of each row in the stream using the supplied function
Parameters: - name – attribute name
- transfer_func – function that takes the row and returns the value to be stored at the named attribute
Return type:
-
take
(n)[source]¶ Takes n rows from the stream
>>> DataStream(range(100000)).take(3).to_list() ... [0, 1, 2]
Parameters: n (int) – number of rows to be taken Return type: DataStream
-
take_now
(n)[source]¶ Like take, but evaluates immediately and returns a
DataSet
>>> DataStream(range(100000)).take_now(3) ... DataSet([0, 1, 2])
Parameters: n (int) – number of rows to be taken Return type: DataSet
-
to_dict
()[source]¶ Converts a stream to a
dict
>>> stream = DataStream(['hi', 'hey', 'yo', 'sup']) >>> stream.group_by_fn(lambda w: len(w)).to_dict() ... {2: ('hi', 'yo'), 3: ('hey', 'sup')}
Return type: dict
-
to_list
()[source]¶ Converts a stream to a
list
>>> DataStream(range(5)).map(lambda n: n * 5).to_list() ... [0, 5, 10, 15, 20]
Return type: list
-
to_set
()[source]¶ Converts a stream to a
set
>>> DataStream([1, 2, 3, 4, 2, 3]).to_set() ... {1, 2, 3, 4}
Return type: set
-
where
(name=<class 'datastreams.datastreams.Nothing'>)[source]¶ Short hand for common filter functions -
where
selects an attribute to be filtered on, with a condition likegt
orcontains
following it.>>> Person = namedtuple('Person', ['name', 'year_born']) >>> DataStream([Person('amy', 1987), Person('brad', 1980)]).where('year_born').gt(1983).to_list() ... [Person(name='amy', year_born=1987)]
Parameters: name (str) – attribute name to filter on Return type: FilterRadix
-
DataSet API¶
-
class
datastreams.
DataSet
(source)[source]¶ Like a
DataStream
, but with the source cached as a list. Able to perform tasks that require the whole source, like sorting and reversing.-
apply
(function)[source]¶ Apply a function to the whole dataset
Parameters: function (function) – function to be called on the whole dataset Return type: DataSet
-
call
(function)[source]¶ Call a function with the whole dataset, returning the original
>>> from pprint import pprint >>> DataSet([1, 2, 3]).apply(pprint) ... DataSet([1, 2, 3]) ... DataSet([1, 2, 3])
Parameters: function (function) – function to be called on the whole dataset Return type: DataSet
-
reverse
()[source]¶ Reverses a
DataSet
>>> DataSet(range(5)).reverse() ... DataSet([4, 3, 2, 1, 0])
Return type: DataSet
-
sort_by
(key_fn, descending=True)[source]¶ Sort the
DataSet
using the given key function>>> Person = namedtuple('Person', ['name', 'year_born']) >>> DataSet([Person('amy', 1987), Person('brad', 1980)]).sort_by(lambda p: p.year_born) ... DataSet([Datum({'name': 'amy', 'year_born': 1980}), Datum({'name': 'brad', 'year_born': 1987})])
Parameters: - key_fn (function) – function used select the key used to sort the dataset
- descending (bool) – sorts descending if
True
Return type:
-
to_stream
()[source]¶ Streams from this dataset
Return type: DataStream
-