search
Search
Join our weekly DS/ML newsletter layers DS/ML Guides
menu
menu search toc more_vert
Robocat
Guest 0reps
Thanks for the thanks!
close
Comments
Log in or sign up
Cancel
Post
account_circle
Profile
exit_to_app
Sign out
help Ask a question
Share on Twitter
search
keyboard_voice
close
Searching Tips
Search for a recipe:
"Creating a table in MySQL"
Search for an API documentation: "@append"
Search for code: "!dataframe"
Apply a tag filter: "#python"
Useful Shortcuts
/ to open search panel
Esc to close search panel
to navigate between search results
d to clear all current filters
Enter to expand content preview
icon_star
Doc Search
icon_star
Code Search Beta
SORRY NOTHING FOUND!
mic
Start speaking...
Voice search is only supported in Safari and Chrome.
Navigate to
A
A
brightness_medium
share
arrow_backShare
Twitter
Facebook

PySpark RDD | map method

Machine Learning
chevron_right
PySpark
chevron_right
Documentation
chevron_right
PySpark RDD
schedule Jul 1, 2022
Last updated
local_offer PySpark
Tags

PySpark RDD's map(~) method applies a function on each element of the RDD.

Parameters

1. f | function

The function to apply.

2. preservesPartitioning | boolean | optional

Whether or not to let Spark assume that partitioning is still valid. This is only relevant to PairRDD. Consult examples below for clarification. By default, preservesPartitioning=False.

Return Value

A PySpark RDD (pyspark.rdd.PipelinedRDD).

Examples

Applying a function to each element of RDD

To make all values in the RDD lowercased:

# Create a RDD with 5 partitions
rdd = sc.parallelize(["A","B","C","D","E","F"], numSlices=5)
new_rdd = rdd.map(lambda x: x.lower())
new_rdd.collect()
['a', 'b', 'c', 'd', 'e', 'f']

Preserving partition while applying the map method to RDD

The preservesPartitioning parameter only comes into play when the RDD contains a list of tuples (pair RDD).

When a RDD is re-partitioned via partitionBy(~) (using a hash partitioner), we guarantee that the tuples with the same key end up in the same partition:

rdd = sc.parallelize([("A",1),("B",1),("C",1),("A",1),("D",1)], numSlices=3)
new_rdd = rdd.partitionBy(numPartitions=2)
new_rdd.glom().collect()
[[('C', 1)], [('A', 1), ('B', 1), ('A', 1), ('D', 1)]]

Indeed, we see that the tuple ('A',1) and ('A',1) lie in the same partition.

Let us now perform a map(~) operation with preservesPartitioning set to False (default):

mapped_rdd = new_rdd.map(lambda my_tuple: (my_tuple[0], my_tuple[1]+3))
mapped_rdd.glom().collect()
[[('C', 4)], [('A', 4), ('B', 4), ('A', 4), ('D', 4)]]

Here, we are applying a map(~) that returns a tuple with the same key, but with a different value. We can see that the partitioning has not changed. Behind the scenes, however, Spark internally has a flag that indicates whether or not the partitioning has been destroyed, and this flag has now been set to True (i.e. partitioning has been destroyed) due to setting preservesPartitioning=False by default. This is naive of Spark to do so, since the tuples key have not been changed, and so the partitioning should still be valid.

We can confirm that Spark is now naively unaware that the data is partitioned by the tuple key by performing a shuffling operation like reduceByKey(~):

mapped_rdd_reduced = mapped_rdd.reduceByKey(lambda x: x+y)
print(mapped_rdd_reduced.toDebugString().decode("utf-8"))
(2) PythonRDD[238] at RDD at PythonRDD.scala:58 []
| MapPartitionsRDD[237] at mapPartitions at PythonRDD.scala:183 []
| ShuffledRDD[236] at partitionBy at <unknown>:0 []
+-(2) PairwiseRDD[235] at reduceByKey at <command-1339085475381822>:1 []
| PythonRDD[234] at reduceByKey at <command-1339085475381822>:1 []
| MapPartitionsRDD[223] at mapPartitions at PythonRDD.scala:183 []
| ShuffledRDD[222] at partitionBy at <unknown>:0 []
+-(3) PairwiseRDD[221] at partitionBy at <command-1339085475381815>:2 []
| PythonRDD[220] at partitionBy at <command-1339085475381815>:2 []
| ParallelCollectionRDD[219] at readRDDFromInputStream at PythonRDD.scala:413 []

You can see that a shuffling has indeed occurred. However, this is completely unnecessary because we know that the tuples with the same key reside in the same partition (machine), and so this operation can be done locally.

Now, consider the case when we set preservesPartitioning to True:

mapped_rdd_preserved = new_rdd.map(lambda my_tuple: (my_tuple[0], my_tuple[1]+3), preservesPartitioning=True)
mapped_rdd_preserved_reduced = mapped_rdd_preserved.reduceByKey(lambda x: x+y)
print(mapped_rdd_preserved_reduced.toDebugString().decode("utf-8"))
(2) PythonRDD[239] at RDD at PythonRDD.scala:58 []
| MapPartitionsRDD[223] at mapPartitions at PythonRDD.scala:183 []
| ShuffledRDD[222] at partitionBy at <unknown>:0 []
+-(3) PairwiseRDD[221] at partitionBy at <command-1339085475381815>:2 []
| PythonRDD[220] at partitionBy at <command-1339085475381815>:2 []
| ParallelCollectionRDD[219] at readRDDFromInputStream at PythonRDD.scala:413 []

We can see that no shuffling has occurred. This is because we tell Spark that we have only changed the value of the tuple, and not the key, and so Spark should assume that the original partitioning is kept intact.

mail
Join our newsletter for updates on new DS/ML comprehensive guides (spam-free)
robocat
Published by Isshin Inada
Edited by 0 others
Did you find this page useful?
thumb_down
Ask a question or leave a feedback...
0
thumb_down
0
chat_bubble_outline
0
settings
Enjoy our search
Hit / to insta-search docs and recipes!