Comprehensive guide on caching in PySpark
To follow along with this guide, you should know what RDDs, transformations and actions are. Please visit our comprehensive guide on RDD if you feel rusty!
What is caching in Spark?
The core data structure used in Spark is the resilient distributed dataset (RDD). There are two types of operations one can perform on a RDD: a transformation and an action. Most operations such as mapping and filtering are transformations. Whenever a transformation is applied to a RDD, a new RDD is made instead of mutating the original RDD directly:
Here, applying the map transformation on the original
RDD', and then applying the filter transformation creates
Now here is where caching comes into play. Suppose we wanted to apply a transformation on
RDD'' multiple times. Without caching,
RDD'' must be computed from scratch using
RDD each time. This means that if we apply a transformation on
RDD'' 10 times, then
RDD'' must be generated 10 times from
RDD. If we cache
RDD'', then we no longer have to recompute
RDD'', but instead reuse the
RDD'' that exists in cache. In this way, caching can greatly speed up your computations and is therefore critical for optimizing your PySpark code.
How to perform caching in PySpark?
Caching a RDD or a DataFrame can be done by calling the RDD's or DataFrame's
cache() method. The catch is that the
cache() method is a transformation (lazy-execution) instead of an action. This means that even if you call
cache() on a RDD or a DataFrame, Spark will not immediately cache the data. Spark will only cache the RDD by performing an action such as
df.cache() returns the cached PySpark DataFrame.
We could also perform caching via the
persist() method. The difference between
persist() is that
count() stores the cache using the setting
persist() allows you to specify storage levels other than
MEMORY_AND_DISK means that the cache will be stored in memory if possible, otherwise the cache will be stored in disk. Other storage levels include
Basic example of caching in PySpark
Consider the following PySpark DataFrame:
df = spark.createDataFrame([["Alex", 20], ["Bob", 30], ["Cathy", 40]], ["name", "age"])df.show()+-----+---+| name|age|+-----+---+| Alex| 20|| Bob| 30||Cathy| 40|+-----+---+
Let's print out the execution plan of the
filter(~) operation that fetches rows where the
age is not
== Physical Plan ==*(1) Filter (isnotnull(age#6003L) AND NOT (age#6003L = 20))+- *(1) Scan ExistingRDD[name#6002,age#6003L]
Here note the following:
explain()method prints out the physical plan, which you can interpret as the actual execution plan
the executed plan is often read from bottom to top
we see that PySpark first scans the DataFrame (
Scan ExistingRDD). RDD is shown here instead of DataFrame because, remember, DataFrames are implemented as RDDs under the hood.
while scanning, the filtering (
isnotnull(age) AND NOT (age=20)) is applied.
Let us now cache the PySpark DataFrame returned by the
filter(~) method using
count() method is an action, which means that the PySpark DataFrame returned by
filter(~) will be cached.
filter(~) again and print the physical plan:
== Physical Plan ==InMemoryTableScan [name#6002, age#6003L]+- InMemoryRelation [name#6002, age#6003L], StorageLevel(disk, memory, deserialized, 1 replicas)+- *(1) Filter (isnotnull(age#6003L) AND NOT (age#6003L = 20))+- *(1) Scan ExistingRDD[name#6002,age#6003L]
The physical plan is now different from when we called
filter(~) before caching. We see two new operations:
InMemoryRelation. Behind the scenes, the cache manager checks whether a DataFrame resulting from the same computation exists in cache. In this case, we have cached the resulting DataFrame from
filter('age!=20') previously via
cache() followed by an action (
count()), so the cache manager uses this cached DataFrame instead of recomputing
InMemoryRelation we see in the physical plan indicate that we are working with the cached version of the DataFrame.
Using the cached object explicitly
persist() return a cached version of the RDD or DataFrame. As we have seen in the above example, we can cache RDDs or DataFrames without explicitly using the returned cached object:
It is better practise to use the cached object returned by
cache() like so:
The advantage of this is that calling methods like
df_cached.count() clearly indicates that we are using a cached DataFrame.
Confirming cache via Spark UI
We can also confirm the caching behaviour via the Spark UI by clicking on the Stages tab:
Click on the link provided in the Description column. This should open up a graph that shows the operations performed under the hood:
You should see a green box in the middle, which means that this specific operation was not computed thanks to a presence of a cache.
Note that if you are using Databricks, then click on View in the output cell:
This should open up the Spark UI and show you the same graph as above.
We could also see the stored caches on the Storage tab:
We can see that all the partitions of the RDD (8 in this case) resulting from the operation
filter.(age!=20) is stored in memory cache as opposed to disk cache. This is because the storage level of the
cache() method is set to
MEMORY_AND_DISK by default, which means to store the cache in disk only if the cache does not fit in memory.
Clearing existing cache
To clear (evict) all the cache, call the following:
To clear the cache of a specific RDD or DataFrame, call the
df_cached = df.filter('age != 20').cache()# Trigger an action to persist cachedf_cached.count()# Delete the cachedf_cached.unpersist()
It is good practise to clear cache because if space starts running out, Spark will begin removing cache using the LRU (least recently used) policy. It is generally better to not rely on automatic deletion because it may delete cache that is vital for your PySpark application.
Things to consider when caching
Cache computed data that is used frequently
Caching is recommended when you use the same computed RDD or DataFrame multiple times. Do remember that computing RDDs is generally very fast, so you may consider caching only when your PySpark program is too slow for your needs.
We should cache frugally because caching consumes memory, and memory is needed for the worker nodes to perform their tasks. If we do decide to cache, make sure that you're only caching the part of data that you will reuse multiple times. For instance, if we are going to frequently perform some computation on column A only, then it makes sense to cache column A instead of the entire DataFrame. Another example is if you have two queries where one involves columns A and B, and the other involves columns B and C, then it may be a good idea to cache columns A, B and C instead of caching columns (A and B) and columns (B and C) which will store column B in cache redundantly.