5. Spark Stream, DStreams

5.1. Transformations

5.1.1. Map

[1]:
from pyspark.streaming import StreamingContext
from random import choice

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

alphabets = list('abcdefghijklmnopqrstuvwxyz')
input_data = [[choice(alphabets) for _ in range(100)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]

stream = ssc.queueStream(rdd_queue).map(lambda word: (word, 1))
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
-------------------------------------------
Time: 2019-10-31 16:24:14
-------------------------------------------
('s', 1)
('c', 1)
('c', 1)
('g', 1)
('h', 1)
('z', 1)
('x', 1)
('u', 1)
('x', 1)
('j', 1)
...

-------------------------------------------
Time: 2019-10-31 16:24:15
-------------------------------------------
('j', 1)
('z', 1)
('m', 1)
('n', 1)
('s', 1)
('d', 1)
('n', 1)
('o', 1)
('x', 1)
('h', 1)
...

5.1.2. Flat map

[2]:
from pyspark.streaming import StreamingContext
from random import randint

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

input_data = [[i for i in range(100)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]

stream = ssc.queueStream(rdd_queue).flatMap(lambda num: [(num, randint(1, 10)) for _ in range(num)])
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
-------------------------------------------
Time: 2019-10-31 16:24:17
-------------------------------------------
(1, 8)
(2, 2)
(2, 9)
(3, 4)
(3, 5)
(3, 1)
(4, 8)
(4, 9)
(4, 1)
(4, 9)
...

-------------------------------------------
Time: 2019-10-31 16:24:18
-------------------------------------------
(1, 8)
(2, 2)
(2, 9)
(3, 4)
(3, 5)
(3, 1)
(4, 8)
(4, 9)
(4, 1)
(4, 9)
...

5.1.3. Filter

[3]:
from pyspark.streaming import StreamingContext
from random import randint

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

input_data = [[randint(1, 100) for i in range(100)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]

stream = ssc.queueStream(rdd_queue).filter(lambda num: num % 2 == 0)
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
-------------------------------------------
Time: 2019-10-31 16:24:19
-------------------------------------------
88
16
30
8
58
68
30
80
62
86
...

-------------------------------------------
Time: 2019-10-31 16:24:20
-------------------------------------------
26
54
16
12
38
22
68
48
78
20
...

5.1.4. Repartition

[4]:
from pyspark.streaming import StreamingContext
from random import randint

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

input_data = [[randint(1, 100) for i in range(100)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]

stream = ssc.queueStream(rdd_queue).repartition(1)
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
-------------------------------------------
Time: 2019-10-31 16:24:21
-------------------------------------------
30
23
32
29
60
10
73
57
92
85
...

-------------------------------------------
Time: 2019-10-31 16:24:22
-------------------------------------------
95
93
13
41
60
11
24
57
67
70
...

5.1.5. Union

[5]:
from pyspark.streaming import StreamingContext
from random import randint

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

input_data1 = [[randint(1, 100) for i in range(100)] for _ in range(100)]
input_data2 = [[randint(1, 100) for i in range(100)] for _ in range(100)]

rdd_queue1 = [ssc.sparkContext.parallelize(item) for item in input_data1]
rdd_queue2 = [ssc.sparkContext.parallelize(item) for item in input_data2]

stream1 = ssc.queueStream(rdd_queue1).filter(lambda num: num % 2 == 0)
stream2 = ssc.queueStream(rdd_queue2).filter(lambda num: num % 2 == 0)

stream = stream1.union(stream2)
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
-------------------------------------------
Time: 2019-10-31 16:24:24
-------------------------------------------
64
56
40
26
86
92
94
70
84
96
...

-------------------------------------------
Time: 2019-10-31 16:24:25
-------------------------------------------
94
42
68
74
88
96
92
80
66
2
...

5.1.6. Count

[6]:
from pyspark.streaming import StreamingContext
from random import randint

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

input_data = [[randint(1, 100) for _ in range(randint(1, 20))] for i in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]

stream = ssc.queueStream(rdd_queue).count()
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
-------------------------------------------
Time: 2019-10-31 16:24:27
-------------------------------------------
6

-------------------------------------------
Time: 2019-10-31 16:24:28
-------------------------------------------
2

5.1.7. Reduce

[7]:
from pyspark.streaming import StreamingContext
from random import randint

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

input_data = [[randint(1, 100) for _ in range(randint(1, 20))] for i in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]

stream = ssc.queueStream(rdd_queue).reduce(lambda a, b: a + b)
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
-------------------------------------------
Time: 2019-10-31 16:24:30
-------------------------------------------
129

-------------------------------------------
Time: 2019-10-31 16:24:31
-------------------------------------------
572

5.1.8. Count by value

[8]:
from pyspark.streaming import StreamingContext
from random import randint

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

input_data = [[randint(1, 100) for _ in range(randint(1, 20))] for i in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]

stream = ssc.queueStream(rdd_queue).countByValue()
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
-------------------------------------------
Time: 2019-10-31 16:24:33
-------------------------------------------
(57, 1)
(75, 1)

-------------------------------------------
Time: 2019-10-31 16:24:34
-------------------------------------------
(60, 2)
(40, 1)
(20, 1)
(72, 1)
(56, 1)
(28, 1)
(96, 1)
(69, 1)
(13, 1)
(53, 1)
...

5.1.9. Reduce by key

[9]:
from pyspark.streaming import StreamingContext
from random import randint

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

input_data = [[(randint(1, 2), randint(1, 100)) for _ in range(randint(1, 20))] for i in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]

stream = ssc.queueStream(rdd_queue).reduceByKey(lambda a, b: a + b)
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
-------------------------------------------
Time: 2019-10-31 16:24:36
-------------------------------------------
(1, 178)

-------------------------------------------
Time: 2019-10-31 16:24:37
-------------------------------------------
(1, 104)
(2, 79)

5.1.10. Join

[10]:
from pyspark.streaming import StreamingContext
from random import randint, choice

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

input_data1 = [[(choice([0, 1]), randint(1, 2)) for _ in range(5)] for _ in range(100)]
input_data2 = [[(choice([0, 1]), choice(['a', 'b'])) for _ in range(5)] for _ in range(100)]

rdd_queue1 = [ssc.sparkContext.parallelize(item) for item in input_data1]
rdd_queue2 = [ssc.sparkContext.parallelize(item) for item in input_data2]

counts1 = ssc.queueStream(rdd_queue1)
counts2 = ssc.queueStream(rdd_queue2)

stream = counts1.join(counts2)
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
-------------------------------------------
Time: 2019-10-31 16:24:39
-------------------------------------------
(0, (1, 'b'))
(0, (1, 'a'))
(1, (2, 'b'))
(1, (2, 'a'))
(1, (2, 'a'))
(1, (2, 'b'))
(1, (2, 'a'))
(1, (2, 'a'))
(1, (2, 'b'))
(1, (2, 'a'))
...

-------------------------------------------
Time: 2019-10-31 16:24:40
-------------------------------------------
(0, (2, 'b'))
(0, (2, 'a'))
(1, (1, 'b'))
(1, (1, 'b'))
(1, (1, 'a'))
(1, (1, 'b'))
(1, (1, 'b'))
(1, (1, 'a'))
(1, (1, 'b'))
(1, (1, 'b'))
...

5.1.11. Cogroup

[11]:
from pyspark.streaming import StreamingContext
from random import randint, choice

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

input_data1 = [[(choice([0, 1]), randint(1, 2)) for _ in range(5)] for _ in range(100)]
input_data2 = [[(choice([0, 1]), choice(['a', 'b'])) for _ in range(5)] for _ in range(100)]

rdd_queue1 = [ssc.sparkContext.parallelize(item) for item in input_data1]
rdd_queue2 = [ssc.sparkContext.parallelize(item) for item in input_data2]

counts1 = ssc.queueStream(rdd_queue1)
counts2 = ssc.queueStream(rdd_queue2)

stream = counts1.cogroup(counts2)
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
-------------------------------------------
Time: 2019-10-31 16:24:42
-------------------------------------------
(0, (<pyspark.resultiterable.ResultIterable object at 0x7f87990f9f90>, <pyspark.resultiterable.ResultIterable object at 0x7f87990f9a90>))
(1, (<pyspark.resultiterable.ResultIterable object at 0x7f87990f9e10>, <pyspark.resultiterable.ResultIterable object at 0x7f87990f3d90>))

-------------------------------------------
Time: 2019-10-31 16:24:43
-------------------------------------------
(0, (<pyspark.resultiterable.ResultIterable object at 0x7f87990fc490>, <pyspark.resultiterable.ResultIterable object at 0x7f87990fc710>))
(1, (<pyspark.resultiterable.ResultIterable object at 0x7f87990fc650>, <pyspark.resultiterable.ResultIterable object at 0x7f87990ece90>))

5.1.12. Transform

[12]:
from pyspark.streaming import StreamingContext
from random import randint

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

input_data = [[i for i in range(100)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]

stream = ssc.queueStream(rdd_queue).transform(lambda rdd: rdd.filter(lambda x: x % 2 == 0))
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
-------------------------------------------
Time: 2019-10-31 16:24:44
-------------------------------------------
0
2
4
6
8
10
12
14
16
18
...

-------------------------------------------
Time: 2019-10-31 16:24:45
-------------------------------------------
0
2
4
6
8
10
12
14
16
18
...

-------------------------------------------
Time: 2019-10-31 16:24:46
-------------------------------------------
0
2
4
6
8
10
12
14
16
18
...

5.2. Window operations

5.2.1. Window

[13]:
from pyspark.streaming import StreamingContext
from time import sleep

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

lines = ssc.socketTextStream('0.0.0.0', 301)
words = lines.flatMap(lambda s: s.split(' '))
pairs = words.map(lambda word: (word, 1))

stream = pairs.window(5, 1)
stream.pprint()

ssc.start()
sleep(3)
ssc.stop(stopSparkContext=False, stopGraceFully=True)
-------------------------------------------
Time: 2019-10-31 16:24:48
-------------------------------------------
('y', 1)
('u', 1)
('n', 1)
('q', 1)

-------------------------------------------
Time: 2019-10-31 16:24:49
-------------------------------------------
('y', 1)
('u', 1)
('n', 1)
('q', 1)
('q', 1)
('j', 1)
('b', 1)
('l', 1)
('o', 1)
('x', 1)
...

-------------------------------------------
Time: 2019-10-31 16:24:50
-------------------------------------------
('y', 1)
('u', 1)
('n', 1)
('q', 1)
('q', 1)
('j', 1)
('b', 1)
('l', 1)
('o', 1)
('x', 1)
...

-------------------------------------------
Time: 2019-10-31 16:24:55
-------------------------------------------

-------------------------------------------
Time: 2019-10-31 16:24:56
-------------------------------------------

-------------------------------------------
Time: 2019-10-31 16:24:57
-------------------------------------------

-------------------------------------------
Time: 2019-10-31 16:24:58
-------------------------------------------

-------------------------------------------
Time: 2019-10-31 16:24:59
-------------------------------------------

-------------------------------------------
Time: 2019-10-31 16:25:00
-------------------------------------------

-------------------------------------------
Time: 2019-10-31 16:25:01
-------------------------------------------

-------------------------------------------
Time: 2019-10-31 16:25:02
-------------------------------------------

5.2.2. Count by window

[14]:
from pyspark.streaming import StreamingContext
from time import sleep

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

lines = ssc.socketTextStream('0.0.0.0', 301)
words = lines.flatMap(lambda s: s.split(' '))
pairs = words.map(lambda word: (word, 1))

stream = pairs.countByWindow(5, 1)
stream.pprint()

ssc.start()
sleep(3)
ssc.stop(stopSparkContext=False, stopGraceFully=True)
-------------------------------------------
Time: 2019-10-31 16:25:04
-------------------------------------------
4

-------------------------------------------
Time: 2019-10-31 16:25:05
-------------------------------------------
14

5.2.3. Reduce by window

[15]:
from pyspark.streaming import StreamingContext
from time import sleep

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

lines = ssc.socketTextStream('0.0.0.0', 301)
words = lines.flatMap(lambda s: s.split(' '))
pairs = words.map(lambda word: (word, 1))

stream = pairs.reduceByWindow(lambda a, b: a + b, lambda a, b: a - b, 5, 1)
stream.pprint()

ssc.start()
sleep(3)
ssc.stop(stopSparkContext=False, stopGraceFully=True)
-------------------------------------------
Time: 2019-10-31 16:25:20
-------------------------------------------
('c', 1, 'v', 1, 'w', 1, 'c', 1, 'y', 1, 'r', 1)

-------------------------------------------
Time: 2019-10-31 16:25:21
-------------------------------------------
('c', 1, 'v', 1, 'w', 1, 'c', 1, 'y', 1, 'r', 1, 'c', 1, 'n', 1, 'q', 1, 'j', 1, 'p', 1, 'v', 1, 'y', 1, 'a', 1, 'm', 1, 'w', 1)

5.2.4. Reduce by key and window

[16]:
from pyspark.streaming import StreamingContext
from time import sleep

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

lines = ssc.socketTextStream('0.0.0.0', 300)
words = lines.flatMap(lambda s: s.split(' '))
pairs = words.map(lambda word: (word, 1))

stream = pairs.reduceByKeyAndWindow(lambda a, b: a + b, lambda a, b: a - b, 5, 1)
stream.pprint()

ssc.start()
sleep(3)
ssc.stop(stopSparkContext=False, stopGraceFully=True)
-------------------------------------------
Time: 2019-10-31 16:25:36
-------------------------------------------
('81', 1)
('67', 1)
('55', 1)

-------------------------------------------
Time: 2019-10-31 16:25:37
-------------------------------------------
('4', 1)
('10', 1)
('81', 1)
('67', 2)
('62', 1)
('49', 1)
('96', 1)
('9', 1)
('55', 1)
('5', 1)
...

-------------------------------------------
Time: 2019-10-31 16:25:38
-------------------------------------------
('4', 1)
('10', 1)
('50', 1)
('82', 1)
('57', 1)
('26', 1)
('81', 1)
('67', 3)
('62', 1)
('49', 1)
...

5.2.5. Count by value and window

[17]:
from pyspark.streaming import StreamingContext
from time import sleep

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

lines = ssc.socketTextStream('0.0.0.0', 300)
words = lines.flatMap(lambda s: s.split(' '))
pairs = words.map(lambda word: (word, 1))

stream = pairs.countByValueAndWindow(5, 1)
stream.pprint()

ssc.start()
sleep(3)
ssc.stop(stopSparkContext=False, stopGraceFully=True)
-------------------------------------------
Time: 2019-10-31 16:25:52
-------------------------------------------
(('10', 1), 1)
(('64', 1), 1)
(('69', 1), 1)

-------------------------------------------
Time: 2019-10-31 16:25:53
-------------------------------------------
(('6', 1), 1)
(('80', 1), 1)
(('31', 1), 1)
(('81', 1), 1)
(('25', 1), 1)
(('30', 1), 1)
(('10', 1), 1)
(('64', 1), 1)
(('53', 1), 1)
(('16', 1), 1)
...

-------------------------------------------
Time: 2019-10-31 16:25:54
-------------------------------------------
(('6', 1), 1)
(('80', 1), 1)
(('31', 1), 1)
(('81', 1), 1)
(('25', 1), 1)
(('30', 1), 1)
(('49', 1), 1)
(('18', 1), 1)
(('62', 1), 1)
(('10', 1), 1)
...

5.3. Output operations

5.3.1. Save as text file

[18]:
from pyspark.streaming import StreamingContext
from time import sleep

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

lines = ssc.socketTextStream('0.0.0.0', 300)
words = lines.flatMap(lambda s: s.split(' '))
pairs = words.map(lambda word: (word, 1))

stream = pairs.countByValueAndWindow(5, 1)
stream.pprint()
stream.saveAsTextFiles('dstream/data', 'txt')

ssc.start()
sleep(3)
ssc.stop(stopSparkContext=False, stopGraceFully=True)
-------------------------------------------
Time: 2019-10-31 16:26:08
-------------------------------------------
(('42', 1), 1)
(('30', 1), 1)
(('57', 1), 1)
(('38', 1), 1)
(('93', 1), 1)