If you’ve ever worked with Spark on any kind of time-series analysis, you probably got to the point where you need to join two DataFrames based on time difference between timestamp fields.
For the purpose of this post, let’s assume we have a DataFrame with events data, and another one with measurements (couldn’t be more generic than that…). Both have timestamp fields (eventTime and measurementTime), and we want to join every event with the measurements that were recorded in the hour before it.
A naive approach (just specifying this as the range condition) would result in a full cartesian product and a filter that enforces the condition (tested using Spark 2.0). This has a horrible effect on performance, especially if DataFrames are more than a few hundred thousands records.
While Spark guys are working on a more generic solution (see github issue here), there are still use-cases we can greatly improve performance even with the current join strategies that are available. One of them is the one described above (events to measurements from the hour before it), and I believe it’s a very common one. In this post I’ll briefly go over the suggested implementation that worked for me, and if your use-case is different, you could probably play with that a bit so it addresses yours too.
The Data
To keep everything simple, we’ll work with the following dataframes. You can obviously work with your own classes as long as they have a timestamp or a numeric field. This example will use timestamps.
Events
Random Events can be generated using the following code:
Measurements
Similarly, Measurements can be faked using this code:
The Naive Approach
If we choose to just join the DataFrames and specify the range condition, we’d get the following:
The important thing to look on, is how Spark plans to perform the join we’ve defined on our two DataFrames. The explain command, both on a timestamp field join, and on a numeric field join, gives us this:
The first row is the key, indicating that Spark is going to resolve our request by performing a cartesian product of the two DataFrames. Notice that if number of records in one of the DataFrames is small enough, Spark will be able to broadcast one of them to all machines and perform a BroadcastNestedLoopJoin and BroadcastExchange. This is better, but isn’t considered as a solution as we want to work with large data sets.
The Bucketing, Double-Joining and Filtering Approach
Now let’s take advantage of our less generic use-case. We know that we’re only interested in measurements that happened up to 60 minutes before the event so basically, every event should only be matched with it’s local environment (time based) and a full cartesian product is just a waste of computing effort. We would basically like to group records together and join only groups of records that are close in time.
Let’s start with grouping records in both DataFrames by a 60 minutes interval:
If we look on a specific event record, no matter where it’s ‘located’ in its timeframe window (the first minute or the last minute of the 60 minutes window), we can guarantee that all its matching measurements will either be linked to the same window, or the one before:
What we basically want to do is to group all events to timeframes, and link every measurement to its matching window and the one after (marked in red). Then, we can join by the window column and filter for the exact 60 minutes before (as the two frames will give us more than that). The same technique can be applied to numeric fields as well (grouping to windows is actually integer division). The following code does that for both cases:
As you can see, most of it is just the handling of both timestamps and numerics. The logic itself is pretty straight-forward..
Let’s look at the execution plan now:
When we run this (with a relatively large DataFrame to avoid broadcasting optimizations) we get the following execution plan (some text was truncated to keep is readable…):
SortRangeJoin instead of CartesianProduct is the key here.
Some sanity check
Events dataframe contained 10 events (one every 10 seconds). Measurements dataframe also contained 10 measurements with around 10 seconds between them. Below is the result of the join for rangeBack="30 seconds" (rows were truncated):
eid
eventTime
eventType
mid
measurementTime
value
3
18:24:28
LoginEvent
6
18:24:02
0.12131363910425985
3
18:24:28
LoginEvent
5
18:24:09
0.12030715258495939
3
18:24:28
LoginEvent
4
18:24:21
0.7604318153406678
4
18:24:18
LoginEvent
7
18:23:52
0.6037143578435027
4
18:24:18
LoginEvent
6
18:24:02
0.12131363910425985
4
18:24:18
LoginEvent
5
18:24:09
0.12030715258495939
5
18:24:08
PurchaseEvent
8
18:23:39
0.1435668838975337
5
18:24:08
PurchaseEvent
7
18:23:52
0.6037143578435027
5
18:24:08
PurchaseEvent
6
18:24:02
0.12131363910425985
…
…
…
…
…
…
Benchmarking
In order to estimate the performance boost, I launched a Google Dataproc cluster of 4 regular machines (plus a master) and tried different sizes of DataFrames. The results are below:
Naive Approach
Efficient Approach
# Rows
Time (secs)
# Rows
Time (secs)
10K
7.85
10K
1.57
20K
7.81
50K
2.48
50K
43.40
100K
3.08
80K
97.73
500K
14.09
130K
265.43
1M
13.06
200K
736.58
2M
17.49
5M
85.52
10M
189.39
50M
252.44
100M
438.15
Results are pretty impressive. I could join dataframes with 50M records each at around the same time it took me to join 130K dataframes in the naive approach. I also tried to join larger dataframes with the naive approach but since I’m being billed by the minute, and it started to take hours, I just gave up…
Notice the x-axis is logarithmic. The performance boost is actually around 3 orders of magnitude.
What if my use-case is different?
Well, depends on what exactly you mean by different. If instead of the last 60 minutes, you want to join anything from 120 to 60 minutes before the event, you could just play with the windows we attach measurements to. If you want to join to future measurements we’ll basically have to match measurements to past windows instead of the current and next one. All those changes are pretty easy to do.
If, however, you want to stretch the limits and try to join records where the rangeBack parameter isn’t constant (let’s say it depends on some field of the event), then you’re out of luck with this approach but I hope it at least gave you some ideas…
Hope that helps, and I actually really hope Spark devs will support range joins through Catalyst so we don’t need to hack our way to efficient joins.