1. Spark Stream, DStreams
1.1. Local Spark setup
[ ]:
# Local Spark setup for the book examples.
from pathlib import Path
import shutil
from itertools import count
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
DATA_DIR = Path.cwd()
OUTPUT_DIR = DATA_DIR / "_spark_output" / "dstreams"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
builder = (
SparkSession.builder
.master("local[*]")
.appName("spark-intro-dstreams")
.config("spark.driver.host", "127.0.0.1")
.config("spark.sql.shuffle.partitions", "4")
.config("spark.default.parallelism", "4")
.config("spark.sql.adaptive.enabled", "false")
)
spark = builder.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
sqlContext = spark
_stream_counter = count()
def get_streaming_context(batch_duration=1):
checkpoint_dir = OUTPUT_DIR / f"checkpoint-{next(_stream_counter)}"
shutil.rmtree(checkpoint_dir, ignore_errors=True)
checkpoint_dir.mkdir(parents=True, exist_ok=True)
ssc = StreamingContext(sc, batch_duration)
ssc.checkpoint(str(checkpoint_dir))
return ssc
1.2. Transformations
1.2.1. Map
[ ]:
from pyspark.streaming import StreamingContext
from random import choice
ssc = get_streaming_context()
alphabets = list('abcdefghijklmnopqrstuvwxyz')
input_data = [[choice(alphabets) for _ in range(100)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
stream = ssc.queueStream(rdd_queue).map(lambda word: (word, 1))
stream.pprint()
ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
1.2.2. Flat map
[ ]:
from pyspark.streaming import StreamingContext
from random import randint
ssc = get_streaming_context()
input_data = [[i for i in range(100)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
stream = ssc.queueStream(rdd_queue).flatMap(lambda num: [(num, randint(1, 10)) for _ in range(num)])
stream.pprint()
ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
1.2.3. Filter
[ ]:
from pyspark.streaming import StreamingContext
from random import randint
ssc = get_streaming_context()
input_data = [[randint(1, 100) for i in range(100)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
stream = ssc.queueStream(rdd_queue).filter(lambda num: num % 2 == 0)
stream.pprint()
ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
1.2.4. Repartition
[ ]:
from pyspark.streaming import StreamingContext
from random import randint
ssc = get_streaming_context()
input_data = [[randint(1, 100) for i in range(100)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
stream = ssc.queueStream(rdd_queue).repartition(1)
stream.pprint()
ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
1.2.5. Union
[ ]:
from pyspark.streaming import StreamingContext
from random import randint
ssc = get_streaming_context()
input_data1 = [[randint(1, 100) for i in range(100)] for _ in range(100)]
input_data2 = [[randint(1, 100) for i in range(100)] for _ in range(100)]
rdd_queue1 = [ssc.sparkContext.parallelize(item) for item in input_data1]
rdd_queue2 = [ssc.sparkContext.parallelize(item) for item in input_data2]
stream1 = ssc.queueStream(rdd_queue1).filter(lambda num: num % 2 == 0)
stream2 = ssc.queueStream(rdd_queue2).filter(lambda num: num % 2 == 0)
stream = stream1.union(stream2)
stream.pprint()
ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
1.2.6. Count
[ ]:
from pyspark.streaming import StreamingContext
from random import randint
ssc = get_streaming_context()
input_data = [[randint(1, 100) for _ in range(randint(1, 20))] for i in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
stream = ssc.queueStream(rdd_queue).count()
stream.pprint()
ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
1.2.7. Reduce
[ ]:
from pyspark.streaming import StreamingContext
from random import randint
ssc = get_streaming_context()
input_data = [[randint(1, 100) for _ in range(randint(1, 20))] for i in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
stream = ssc.queueStream(rdd_queue).reduce(lambda a, b: a + b)
stream.pprint()
ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
1.2.8. Count by value
[ ]:
from pyspark.streaming import StreamingContext
from random import randint
ssc = get_streaming_context()
input_data = [[randint(1, 100) for _ in range(randint(1, 20))] for i in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
stream = ssc.queueStream(rdd_queue).countByValue()
stream.pprint()
ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
1.2.9. Reduce by key
[ ]:
from pyspark.streaming import StreamingContext
from random import randint
ssc = get_streaming_context()
input_data = [[(randint(1, 2), randint(1, 100)) for _ in range(randint(1, 20))] for i in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
stream = ssc.queueStream(rdd_queue).reduceByKey(lambda a, b: a + b)
stream.pprint()
ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
1.2.10. Join
[ ]:
from pyspark.streaming import StreamingContext
from random import randint, choice
ssc = get_streaming_context()
input_data1 = [[(choice([0, 1]), randint(1, 2)) for _ in range(5)] for _ in range(100)]
input_data2 = [[(choice([0, 1]), choice(['a', 'b'])) for _ in range(5)] for _ in range(100)]
rdd_queue1 = [ssc.sparkContext.parallelize(item) for item in input_data1]
rdd_queue2 = [ssc.sparkContext.parallelize(item) for item in input_data2]
counts1 = ssc.queueStream(rdd_queue1)
counts2 = ssc.queueStream(rdd_queue2)
stream = counts1.join(counts2)
stream.pprint()
ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
1.2.11. Cogroup
[ ]:
from pyspark.streaming import StreamingContext
from random import randint, choice
ssc = get_streaming_context()
input_data1 = [[(choice([0, 1]), randint(1, 2)) for _ in range(5)] for _ in range(100)]
input_data2 = [[(choice([0, 1]), choice(['a', 'b'])) for _ in range(5)] for _ in range(100)]
rdd_queue1 = [ssc.sparkContext.parallelize(item) for item in input_data1]
rdd_queue2 = [ssc.sparkContext.parallelize(item) for item in input_data2]
counts1 = ssc.queueStream(rdd_queue1)
counts2 = ssc.queueStream(rdd_queue2)
stream = counts1.cogroup(counts2)
stream.pprint()
ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
1.2.12. Transform
[ ]:
from pyspark.streaming import StreamingContext
from random import randint
ssc = get_streaming_context()
input_data = [[i for i in range(100)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
stream = ssc.queueStream(rdd_queue).transform(lambda rdd: rdd.filter(lambda x: x % 2 == 0))
stream.pprint()
ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)
1.3. Window operations
1.3.1. Window
[ ]:
from pyspark.streaming import StreamingContext
from time import sleep
ssc = get_streaming_context()
input_data = [[c for c in 'abcde'] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
words = ssc.queueStream(rdd_queue)
pairs = words.map(lambda word: (word, 1))
stream = pairs.window(5, 1)
stream.pprint()
ssc.start()
sleep(3)
ssc.stop(stopSparkContext=False, stopGraceFully=True)
1.3.2. Count by window
[ ]:
from pyspark.streaming import StreamingContext
from time import sleep
ssc = get_streaming_context()
input_data = [[c for c in 'abcde'] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
words = ssc.queueStream(rdd_queue)
pairs = words.map(lambda word: (word, 1))
stream = pairs.countByWindow(5, 1)
stream.pprint()
ssc.start()
sleep(3)
ssc.stop(stopSparkContext=False, stopGraceFully=True)
1.3.3. Reduce by window
[ ]:
from pyspark.streaming import StreamingContext
from time import sleep
ssc = get_streaming_context()
input_data = [[c for c in 'abcde'] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
words = ssc.queueStream(rdd_queue)
pairs = words.map(lambda word: (word, 1))
stream = pairs.reduceByWindow(lambda a, b: a + b, lambda a, b: a - b, 5, 1)
stream.pprint()
ssc.start()
sleep(3)
ssc.stop(stopSparkContext=False, stopGraceFully=True)
1.3.4. Reduce by key and window
[ ]:
from pyspark.streaming import StreamingContext
from time import sleep
ssc = get_streaming_context()
input_data = [[str(i) for i in range(10)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
words = ssc.queueStream(rdd_queue)
pairs = words.map(lambda word: (word, 1))
stream = pairs.reduceByKeyAndWindow(lambda a, b: a + b, lambda a, b: a - b, 5, 1)
stream.pprint()
ssc.start()
sleep(3)
ssc.stop(stopSparkContext=False, stopGraceFully=True)
1.3.5. Count by value and window
[ ]:
from pyspark.streaming import StreamingContext
from time import sleep
ssc = get_streaming_context()
input_data = [[str(i) for i in range(10)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
words = ssc.queueStream(rdd_queue)
pairs = words.map(lambda word: (word, 1))
stream = pairs.countByValueAndWindow(5, 1)
stream.pprint()
ssc.start()
sleep(3)
ssc.stop(stopSparkContext=False, stopGraceFully=True)
1.4. Output operations
1.4.1. Save as text file
[ ]:
from pyspark.streaming import StreamingContext
from time import sleep
ssc = get_streaming_context()
input_data = [[str(i) for i in range(10)] for _ in range(100)]
rdd_queue = [ssc.sparkContext.parallelize(item) for item in input_data]
words = ssc.queueStream(rdd_queue)
pairs = words.map(lambda word: (word, 1))
stream = pairs.countByValueAndWindow(5, 1)
stream.pprint()
stream.saveAsTextFiles(str(OUTPUT_DIR / 'dstream' / 'data'), 'txt')
ssc.start()
sleep(3)
ssc.stop(stopSparkContext=False, stopGraceFully=True)