When processing SQL data in Spark, sometimes single rows are not the right unit — your calculation may need a collection or group of records as input.
This post shows how to group SQL data for use in Spark.
Selecting from SQL
There are many other examples documenting how to query SQL data into Spark. So I am going to focus on the grouping in particular.
How’s an example query, querying Orders by Customer. This gets data individually, but later we are going to process it per-Customer.
First, we write the query we want as an “query expression” or inline view:
var ordersQuery = """ (SELECT customer_id, product_id, order_date, quantity FROM orders WHERE order_status = 'A' AND (effective_date between SYSDATE-365 AND SYSDATE) ) orderQ """
Then we ask Spark to stream from this.
val ordersDF = spark.read .format("jdbc") .option("url", dbUrl) .option("driver", "oracle.jdbc.driver.OracleDriver") .option("user", dbUser) .option("password", dbPwd) .option("dbtable", ordersQuery) .load()
Grouping Items for Processing
Let’s say we want an algorithm that calculates Customer sales value or special offers. This should be informed by their Orders, but is typically easiest to implement as a per-Customer section of code (particularly if we need to combine other per-customer inputs).
However, our SQL data streamed in Spark is a stream of individual Orders; and does not directly fit onto a per-Customer algorithm. How can we solve this?
We need two things to fix this; 1) to group the data by Customer, and 2) to aggregate each group of Orders into a list. This will result in a Spark stream where each item relates to a single customer, but has all the customer’s orders.
To do this, we map the records into a tuple structure, for use with the Spark ‘groupBy’ operator and ‘aggregate’ operators. A key point here is that the items to be grouped should be mapped as a nested tuple; this will make them a single column within the overall tuple, which can be extracted into an array.
// structure into (Customer, array<Order>) val orderS = orderDF .map( r => ( r.getDecimal(0), (r.getDecimal(1), r.getTimestamp(2), r.getDecimal(3)))) .groupBy( "_1") .agg( collect_list("_2")) .toDF( "customer_id", "orders")
We now have a Spark stream with two attributes — Customer ID and an array of Order tuples. This can be processed in any per-customer algorithm, or even joined with other per-customer streams for more complex pipelines.
Note that at Spark 2.3.2, I was only able to achieve this by de-structuring to tuples; which lost the original attribute names from SQL, thus respecifying these at the end. Unfortunately the grouped Product ID, Quantity and Order Date remain anonymous tuple attributes. Let me know if you find a better way!