1. Spark Stream, DStreams

1.1. Local Spark setup

[ ]:
# Local Spark setup for the book examples.
from pathlib import Path
import shutil
from itertools import count
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession

DATA_DIR = Path.cwd()
OUTPUT_DIR = DATA_DIR / "_spark_output" / "dstreams"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

builder = (
    SparkSession.builder
    .master("local[*]")
    .appName("spark-intro-dstreams")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.sql.shuffle.partitions", "4")
    .config("spark.default.parallelism", "4")
    .config("spark.sql.adaptive.enabled", "false")
)
spark = builder.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
sqlContext = spark

_stream_counter = count()

def get_streaming_context(batch_duration=1):
    checkpoint_dir = OUTPUT_DIR / f"checkpoint-{next(_stream_counter)}"
    shutil.rmtree(checkpoint_dir, ignore_errors=True)
    checkpoint_dir.mkdir(parents=True, exist_ok=True)
    ssc = StreamingContext(sc, batch_duration)
    ssc.checkpoint(str(checkpoint_dir))
    return ssc

1.2. Transformations

1.2.1. Map

[ ]:
from pyspark.streaming import StreamingContext
from random import choice

ssc = get_streaming_context()

alphabets = list('abcdefghijklmnopqrstuvwxyz')
input_data = [[choice(alphabets) for _ in range(100)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]

stream = ssc.queueStream(rdd_queue).map(lambda word: (word, 1))
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)

1.2.2. Flat map

[ ]:
from pyspark.streaming import StreamingContext
from random import randint

ssc = get_streaming_context()

input_data = [[i for i in range(100)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]

stream = ssc.queueStream(rdd_queue).flatMap(lambda num: [(num, randint(1, 10)) for _ in range(num)])
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)

1.2.3. Filter

[ ]:
from pyspark.streaming import StreamingContext
from random import randint

ssc = get_streaming_context()

input_data = [[randint(1, 100) for i in range(100)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]

stream = ssc.queueStream(rdd_queue).filter(lambda num: num % 2 == 0)
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)

1.2.4. Repartition

[ ]:
from pyspark.streaming import StreamingContext
from random import randint

ssc = get_streaming_context()

input_data = [[randint(1, 100) for i in range(100)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]

stream = ssc.queueStream(rdd_queue).repartition(1)
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)

1.2.5. Union

[ ]:
from pyspark.streaming import StreamingContext
from random import randint

ssc = get_streaming_context()

input_data1 = [[randint(1, 100) for i in range(100)] for _ in range(100)]
input_data2 = [[randint(1, 100) for i in range(100)] for _ in range(100)]

rdd_queue1 = [ssc.sparkContext.parallelize(item) for item in input_data1]
rdd_queue2 = [ssc.sparkContext.parallelize(item) for item in input_data2]

stream1 = ssc.queueStream(rdd_queue1).filter(lambda num: num % 2 == 0)
stream2 = ssc.queueStream(rdd_queue2).filter(lambda num: num % 2 == 0)

stream = stream1.union(stream2)
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)

1.2.6. Count

[ ]:
from pyspark.streaming import StreamingContext
from random import randint

ssc = get_streaming_context()

input_data = [[randint(1, 100) for _ in range(randint(1, 20))] for i in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]

stream = ssc.queueStream(rdd_queue).count()
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)

1.2.7. Reduce

[ ]:
from pyspark.streaming import StreamingContext
from random import randint

ssc = get_streaming_context()

input_data = [[randint(1, 100) for _ in range(randint(1, 20))] for i in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]

stream = ssc.queueStream(rdd_queue).reduce(lambda a, b: a + b)
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)

1.2.8. Count by value

[ ]:
from pyspark.streaming import StreamingContext
from random import randint

ssc = get_streaming_context()

input_data = [[randint(1, 100) for _ in range(randint(1, 20))] for i in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]

stream = ssc.queueStream(rdd_queue).countByValue()
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)

1.2.9. Reduce by key

[ ]:
from pyspark.streaming import StreamingContext
from random import randint

ssc = get_streaming_context()

input_data = [[(randint(1, 2), randint(1, 100)) for _ in range(randint(1, 20))] for i in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]

stream = ssc.queueStream(rdd_queue).reduceByKey(lambda a, b: a + b)
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)

1.2.10. Join

[ ]:
from pyspark.streaming import StreamingContext
from random import randint, choice

ssc = get_streaming_context()

input_data1 = [[(choice([0, 1]), randint(1, 2)) for _ in range(5)] for _ in range(100)]
input_data2 = [[(choice([0, 1]), choice(['a', 'b'])) for _ in range(5)] for _ in range(100)]

rdd_queue1 = [ssc.sparkContext.parallelize(item) for item in input_data1]
rdd_queue2 = [ssc.sparkContext.parallelize(item) for item in input_data2]

counts1 = ssc.queueStream(rdd_queue1)
counts2 = ssc.queueStream(rdd_queue2)

stream = counts1.join(counts2)
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)

1.2.11. Cogroup

[ ]:
from pyspark.streaming import StreamingContext
from random import randint, choice

ssc = get_streaming_context()

input_data1 = [[(choice([0, 1]), randint(1, 2)) for _ in range(5)] for _ in range(100)]
input_data2 = [[(choice([0, 1]), choice(['a', 'b'])) for _ in range(5)] for _ in range(100)]

rdd_queue1 = [ssc.sparkContext.parallelize(item) for item in input_data1]
rdd_queue2 = [ssc.sparkContext.parallelize(item) for item in input_data2]

counts1 = ssc.queueStream(rdd_queue1)
counts2 = ssc.queueStream(rdd_queue2)

stream = counts1.cogroup(counts2)
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)

1.2.12. Transform

[ ]:
from pyspark.streaming import StreamingContext
from random import randint

ssc = get_streaming_context()

input_data = [[i for i in range(100)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]

stream = ssc.queueStream(rdd_queue).transform(lambda rdd: rdd.filter(lambda x: x % 2 == 0))
stream.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)

1.3. Window operations

1.3.1. Window

[ ]:
from pyspark.streaming import StreamingContext
from time import sleep

ssc = get_streaming_context()

input_data = [[c for c in 'abcde'] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
words = ssc.queueStream(rdd_queue)
pairs = words.map(lambda word: (word, 1))

stream = pairs.window(5, 1)
stream.pprint()

ssc.start()
sleep(3)
ssc.stop(stopSparkContext=False, stopGraceFully=True)

1.3.2. Count by window

[ ]:
from pyspark.streaming import StreamingContext
from time import sleep

ssc = get_streaming_context()

input_data = [[c for c in 'abcde'] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
words = ssc.queueStream(rdd_queue)
pairs = words.map(lambda word: (word, 1))

stream = pairs.countByWindow(5, 1)
stream.pprint()

ssc.start()
sleep(3)
ssc.stop(stopSparkContext=False, stopGraceFully=True)

1.3.3. Reduce by window

[ ]:
from pyspark.streaming import StreamingContext
from time import sleep

ssc = get_streaming_context()

input_data = [[c for c in 'abcde'] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
words = ssc.queueStream(rdd_queue)
pairs = words.map(lambda word: (word, 1))

stream = pairs.reduceByWindow(lambda a, b: a + b, lambda a, b: a - b, 5, 1)
stream.pprint()

ssc.start()
sleep(3)
ssc.stop(stopSparkContext=False, stopGraceFully=True)

1.3.4. Reduce by key and window

[ ]:
from pyspark.streaming import StreamingContext
from time import sleep

ssc = get_streaming_context()

input_data = [[str(i) for i in range(10)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
words = ssc.queueStream(rdd_queue)
pairs = words.map(lambda word: (word, 1))

stream = pairs.reduceByKeyAndWindow(lambda a, b: a + b, lambda a, b: a - b, 5, 1)
stream.pprint()

ssc.start()
sleep(3)
ssc.stop(stopSparkContext=False, stopGraceFully=True)

1.3.5. Count by value and window

[ ]:
from pyspark.streaming import StreamingContext
from time import sleep

ssc = get_streaming_context()

input_data = [[str(i) for i in range(10)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
words = ssc.queueStream(rdd_queue)
pairs = words.map(lambda word: (word, 1))

stream = pairs.countByValueAndWindow(5, 1)
stream.pprint()

ssc.start()
sleep(3)
ssc.stop(stopSparkContext=False, stopGraceFully=True)

1.4. Output operations

1.4.1. Save as text file

[ ]:
from pyspark.streaming import StreamingContext
from time import sleep

ssc = get_streaming_context()

input_data = [[str(i) for i in range(10)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
words = ssc.queueStream(rdd_queue)
pairs = words.map(lambda word: (word, 1))

stream = pairs.countByValueAndWindow(5, 1)
stream.pprint()
stream.saveAsTextFiles(str(OUTPUT_DIR / 'dstream' / 'data'), 'txt')

ssc.start()
sleep(3)
ssc.stop(stopSparkContext=False, stopGraceFully=True)