Apache Spark Creating Pair RDDs and associated Transformations Part 2

Objective

This article is the second part of Apache Spark Creating Pair RDDs and associated Transformations Part 1. In this article I will go over in more detail about Pair RDD objects. 

Parallelism Optimisation

Every RDD has a fixed number of partitions, this determines the amount of parallelism when running operations on these RDD. We can specify the number of partitions, Spark is able to determine a reasonable default value but this parameter can be tuned to achieve better performance.

Most of the transformations explored accepts a second parameter that defines the number of partitions.

dat = [("first", 1), ("second",2), ("third", 3)]
sc.parallelize(dat).reduceByKey(lambda x,y: x+y,5)

Group By

Often with Key,Value pairs we want to group by keys to better understand the values. With Pair RDD objects we can use the groupByKey() operator which will return an RDD object of type K and values of type V.

If we need to group keys from multiple RDDs we can use a function called cogroup(). This operation will return an RDD with following key value pair (K, (Iterable[V], Iterable[W]). This is assuming that RDD1 is of type [(K, V)] and RDD2 is of type [(K,W)]. The cogroup() function will be the building block for joining RDDs which will be discussed next.

Joins

Often we will need to join Pair RDD objects and Spark supports a large range of options including right, left outer joins, cross joing and inner joins.

The default behaviour of the join() operator is an inner join.

address = [("address1", "123 main st"),("address2","234 blah st"),("address3","345 main st")]
rating = [("address1", 3),("address3", 2)]
rdd1 = sc.parallelize(address)
rdd2 = sc.parallelize(rating)
for key, val in rdd1.join(rdd2).collect():
print("{} {}".format(key,val))
# address1 ('123 main st', 3)
# address3 ('345 main st', 2)

We can also do left and right outer joins.

for key, val in rdd1.leftOuterJoin(rdd2).collect():
print("{} {}".format(key,val))
# address1 ('123 main st', 3)
# address2 ('234 blah st', None)
# address3 ('345 main st', 2)

for key, val in rdd1.rightOuterJoin(rdd2).collect():
print("{} {}".format(key,val))
# address1 ('123 main st', 3)
# address3 ('345 main st', 2)

Sorting

We can sort values of Pair RDD objects as long as there is an ordering provided for the key. Once sorted any subsequent calls on the sorted data using collect() or save() will return the sorted data.

Using the sortByKey() operations we can sort our data ascending, descending or using a custom function. Below is an example of sorting integers in a Pair RDD object as strings.

rdd_int = sc.parallelize([(1,"first"),(2,"second"),(-1,"negative one")])
for key, val in rdd_int.sortByKey(ascending=True, numPartitions=None, keyfunc = lambda x: str(x)).collect():
print("{} {}".format(key, val))
# -1 negative one
# 1 first
# 2 second

In this example the keys were integers, however since we are sorted by our custom function (str(x)) we converted the keys into a string before sorting in ascending order. You can check that this order is correct by checking the result of "-1" < "1".

Conclusion

This concludes the tutorial on transformations available on Pair RDD objects. I hope that you have a better understanding of the difference between regular RDD objects, Pair RDD objects as well as the similarities and differences between the transformations available for the two types of RDD objects.

Next I will discuss Actions that are available on Pair RDD objects and go through some examples. You can read the article here Apache Spark Pyspark Actions on Pair RDD

Subscribe to our mailing list

* indicates required