Apache Spark: When to use joins vs cogroup
A short tutorial on RDD transformations for key-value datasets
While working with key-value pairs in Spark, I've become well-versed in transforming Resilient Distributed Datasets (RDDs) in Spark. However, I've struggled to find a comprehensive resource explaining the differences between join operations and cogroup().
This short article aims to fill a documentation gap by comparing joins and cogroup() for RDDs using Pyspark. For a Scala example, I found this StackOverflow answer very useful.
Transformations on RDDs
join(), leftOuterJoin(), rightOuterJoin(), fullOuterJoin(), cartesian(), and cogroup() are all transformations between two RDDs of key-value pairs, returning a single processed RDD of key-value pairs. While joining datasets over a common key is a crucial transformation supported by Spark with both DataFrames and RDDs, cogroup() is only available for manipulating key-value pairs in RDDs. The fundamental differences between join transformations and cogroup() are:
The output types
The output structure
Output types
Consider two RDDs containing (key, v)
and (key, u)
, respectively. Join transformations return lists of single tuples for each matching key in the form: (key, (v, u))
.
In contrast, cogroup() returns a list of iterable values for all keys in both RDDs in the form: (key, (ResultIterable<v>, ResultIterable<u>))
. ResultIterables can be treated by the developer as standard Python iterable objects: they are simple iterables with the added capability of being pickled.
Output structure
Similar to inner join transformations in SQL and Pyspark DataFrames, join() for RDDs returns a list of key-value pairs for each key present in both RDDs. On the other hand, cogroup() includes in the output all keys found in the RDDs. For keys that don’t exist in one RDD, the corresponding value will be an empty iterable. This conceptual similarity makes cogroup() behave more like a full outer join, with the distinction that fullOuterJoin() replaces non-existent keys in one RDD with None values instead of empty iterables.
Full outer join vs cogroup
So why the need for both fullOuterJoin() and cogroup()? The difference lies in how each transformation handles repeated keys and in the resulting output structure.
With repeated keys, fullOuterJoin() outputs a separate tuple for each occurrence of the key:
sc = spark.sparkContext
# Notice the repeated key “a” in rdd1
rdd1 = sc.parallelize([("a", 1), ("a", 2)])
rdd2 = sc.parallelize([("a", 3),("b", 4)])
joined_rdd = rdd1.fullOuterJoin(rdd2)
joined_rdd.collect()
# Output
> [('a', (1, 3)), ('a', (2, 3)), ('b', (None, 4))]
On the other hand, cogroup() gathers together all occurrences of the same key in an iterable.
cogrouped_rdd = rdd1.cogroup(rdd2)
# Collect and pretty print iterating over iterables
for key, (iterable1, iterable2) in cogrouped_rdd.collect():
print(f"({key}, ({list(iterable1)}, {list(iterable2)}))")
# Output
> (a, ([1, 2], [3]))
> (b, ([], [4]))
This output structure resembles a multimap, making cogroup() more convenient than fullOuterJoin() for operations that require grouping.