Apache Spark Creating Pair RDDs and associated Transformations Part 1

Overview

This article will go through how to create Pair RDDs in Spark as well as discuss the various transformations that can be used on Pair RDDs as well as provide some examples.

How to Create a Pair RDD

There are ways to load data directly into pair RDDs which will be discussed in another article. In this article we will go through creating Pair RDD objects through normal RDD and applying a map() function to create key/value pairs.

Below is an example of using Spark's README.md file to create a regular RDD object and then applying a map() function to use the first word as the key.

readme = sc.textFile('README.md')
pair_rdd = readme.map(lambda x: (x.split(" ")[0], x))

Transformation Operations on Pair RDD

In addition to the transformations you can use on regular RDD objects Pair RDD objects can also use additional transformations that are specific for tuples. Below are some examples:

rdd = sc.parallelize([(1,2),(3,4),(3,6)])
for val in rdd.reduceByKey(lambda x, y: x+y).collect():
print val
# (1, 2)
# (3, 10)

for val, list in rdd.groupByKey().collect():
for li in list:
print("{} {}".format(val, li))
# 1 2
# 3 4
# 3 6

for key, val in rdd.mapValues(lambda x: x+1).collect():
print("{} {}".format(key,val))
# 1 3
# 3 5
# 3 7

for val in rdd.values().collect():
print("{}".format(val))
# 2
# 4
# 6

These are just some of the basic transformation you can use on Pair RDD objects. Other useful transformations include flatMapValues() and combineByKey() which will be shown later in this article.

Transformations on 2 Pair RDD objects

In order to try the various transformation on 2 pair RDD we will re-use the rdd created in the previous example as well as create a second pair RDD object:

rdd_2 = sc.parallelize([(3,6)])

for key,val in rdd.subtractByKey(rdd_2).collect():
print("{} {}".format(key,val))
# 1 2

# You can also do some SQL like operations

for key,val in rdd.join(rdd_2).collect():
print("{} {}".format(key,val))
# 3 (4, 6)
# 3 (6, 6)

for key,val in rdd.rightOuterJoin(rdd_2).collect():
print("{} {}".format(key,val))
# 3 (4, 6)
# 3 (6, 6)

for key,val in rdd.leftOuterJoin(rdd_2).collect():
print("{} {}".format(key,val))
# 1 (2, None)
# 3 (4, 6)
# 3 (6, 6)

Pair RDD objects behave similar to regular RDD objects and hence can use their transformations. Below is an example of filtering on the second element:

res = rdd.filter(lambda key: key[1] < 3)
for key, val in res.collect():
print(key, val)
# (1,2)

Aggregations

Transformations that get statistics on the values of key value pairs. Spark provides some operations that achieves this on pair RDD objects. These include the following:

  • reduceByKey(): similar to reduce() on regular RDD.
  • foldByKey(): similar to fold() on regular RDD.
  • mapValues(): similar to map() on regular RDD.

Below is an example of getting the average values of different keys:

for val in rdd.mapValues(lambda x: (x,1)).reduceByKey(lambda x,y: (x[0] + y[0], x[1]+y[1])).collect():
print("average of key {} = {}".format(val[0],val[1][0]/val[1][1]))
# average of key 1 = 2
# average of key 3 = 5

The above transformations on Pair RDD objects are limiting in that it returns the same type as the input Pair RDD objects. Spark has a combineByKey() method which is similar to aggregate() that enables us to return an RDD object that is of a different type to the input Pair RDD. Below is an example that calculates the average value for each key in the rdd Pair RDD object we have already created:

sumCount = rdd.combineByKey((lambda x:(x,1)), (lambda x,y: (x[0] + y, x[1] + 1)), (lambda x,y: (x[0] + y[0], x[1] + y[1])))
for key, val in sumCount.collect():
print(key, val)
# (1, (2, 1))
# (3, (10, 2))

avg = sumCount.map(lambda (key,xy): (key, xy[0]/xy[1])).collectAsMap()
# NOTE the brackets in the parameters of the lambda function. This is required when working with Pair RDD objects.

print(avg)
# {1: 2, 3: 5}

Lets try and deconstruct the first line. combineByKey() goes through each element of each partition. It then checks if it has seen that key before or not. If it hasn't it uses the provided function lambda x:(x,1) to create the initial value of the accumulator for that key. It it has seen the key value previous it will use another function provided, lambda x,y: (x[0] + y, x[1] + 1) with the current value of the accumulator for the key of interest as well as the new value. 

The final function we provided, lambda x,y: (x[0] + y[0], x[1] + y[1])is used to merge the accumulators for each key of each partition as the accumulators aggregate at the partition level and it is possible to have multiple accumulators for the same key for different partitions.

Conclusion

Now you should know how to create Pair RDD objects as well as some of the most common Pair RDD transformations. Check out the next article to learn more about Pair RDD objects: Apache Spark Creating Pair RDDs and associated Transformations Part 2

Subscribe to our mailing list

* indicates required