How do I get a SQL row_number equivalent for a Spark RDD?
I need to generate a full list of row_numbers for a data table with many columns.
In SQL, this would look like this:
select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;
Now, let's say in Spark I have an RDD of the form (K, V), where V=(col1, col2, col3), so my entries are like
(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.
I want to order these using commands like sortBy(), sortWith(), sortByKey(), zipWithIndex, etc. and have a new RDD with the correct row_number
(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.
(I don't care about the parentheses, so the form can also be (K, (col1,col2,col3,rownum)) instead)
How do I do this?
Here's my first attempt:
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))
val temp1 = sc.parallelize(sample_data)
temp1.collect().foreach(println)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)
temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)
// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)
// note that this isn't ordering with a partition on key value K!
val temp2 = temp1.???
Also note that the function sortBy cannot be applied directly to an RDD, but one must run collect() first, and then the output isn't an RDD, either, but an array
temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)
// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
Here's a little more progress, but still not partitioned:
val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))
temp2.collect().foreach(println)
// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)
sql apache-spark row-number rdd
add a comment |
I need to generate a full list of row_numbers for a data table with many columns.
In SQL, this would look like this:
select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;
Now, let's say in Spark I have an RDD of the form (K, V), where V=(col1, col2, col3), so my entries are like
(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.
I want to order these using commands like sortBy(), sortWith(), sortByKey(), zipWithIndex, etc. and have a new RDD with the correct row_number
(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.
(I don't care about the parentheses, so the form can also be (K, (col1,col2,col3,rownum)) instead)
How do I do this?
Here's my first attempt:
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))
val temp1 = sc.parallelize(sample_data)
temp1.collect().foreach(println)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)
temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)
// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)
// note that this isn't ordering with a partition on key value K!
val temp2 = temp1.???
Also note that the function sortBy cannot be applied directly to an RDD, but one must run collect() first, and then the output isn't an RDD, either, but an array
temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)
// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
Here's a little more progress, but still not partitioned:
val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))
temp2.collect().foreach(println)
// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)
sql apache-spark row-number rdd
This question is an extension of several other partially answered questions, namely stackoverflow.com/questions/23838614/…, qnalist.com/questions/5086896/…, mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/…, stackoverflow.com/questions/27022059/…, stackoverflow.com/questions/24677180/…
– Glenn Strycker
Nov 20 '14 at 22:03
I'm also looking to answer this question. Hive added analytic functions (includingrow_number()) in 0.11, and Spark 1.1 supports HiveQL / Hive 0.12. So it seems thatsqlContext.hql("select row_number() over(partition by ...should work, but I'm getting an error.
– dnlbrky
Nov 23 '14 at 3:52
add a comment |
I need to generate a full list of row_numbers for a data table with many columns.
In SQL, this would look like this:
select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;
Now, let's say in Spark I have an RDD of the form (K, V), where V=(col1, col2, col3), so my entries are like
(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.
I want to order these using commands like sortBy(), sortWith(), sortByKey(), zipWithIndex, etc. and have a new RDD with the correct row_number
(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.
(I don't care about the parentheses, so the form can also be (K, (col1,col2,col3,rownum)) instead)
How do I do this?
Here's my first attempt:
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))
val temp1 = sc.parallelize(sample_data)
temp1.collect().foreach(println)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)
temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)
// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)
// note that this isn't ordering with a partition on key value K!
val temp2 = temp1.???
Also note that the function sortBy cannot be applied directly to an RDD, but one must run collect() first, and then the output isn't an RDD, either, but an array
temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)
// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
Here's a little more progress, but still not partitioned:
val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))
temp2.collect().foreach(println)
// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)
sql apache-spark row-number rdd
I need to generate a full list of row_numbers for a data table with many columns.
In SQL, this would look like this:
select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;
Now, let's say in Spark I have an RDD of the form (K, V), where V=(col1, col2, col3), so my entries are like
(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.
I want to order these using commands like sortBy(), sortWith(), sortByKey(), zipWithIndex, etc. and have a new RDD with the correct row_number
(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.
(I don't care about the parentheses, so the form can also be (K, (col1,col2,col3,rownum)) instead)
How do I do this?
Here's my first attempt:
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))
val temp1 = sc.parallelize(sample_data)
temp1.collect().foreach(println)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)
temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)
// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)
// note that this isn't ordering with a partition on key value K!
val temp2 = temp1.???
Also note that the function sortBy cannot be applied directly to an RDD, but one must run collect() first, and then the output isn't an RDD, either, but an array
temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)
// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
Here's a little more progress, but still not partitioned:
val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))
temp2.collect().foreach(println)
// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)
sql apache-spark row-number rdd
sql apache-spark row-number rdd
edited Sep 8 '17 at 21:16
hi-zir
19.4k62663
19.4k62663
asked Nov 20 '14 at 21:51
Glenn Strycker
2,19151740
2,19151740
This question is an extension of several other partially answered questions, namely stackoverflow.com/questions/23838614/…, qnalist.com/questions/5086896/…, mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/…, stackoverflow.com/questions/27022059/…, stackoverflow.com/questions/24677180/…
– Glenn Strycker
Nov 20 '14 at 22:03
I'm also looking to answer this question. Hive added analytic functions (includingrow_number()) in 0.11, and Spark 1.1 supports HiveQL / Hive 0.12. So it seems thatsqlContext.hql("select row_number() over(partition by ...should work, but I'm getting an error.
– dnlbrky
Nov 23 '14 at 3:52
add a comment |
This question is an extension of several other partially answered questions, namely stackoverflow.com/questions/23838614/…, qnalist.com/questions/5086896/…, mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/…, stackoverflow.com/questions/27022059/…, stackoverflow.com/questions/24677180/…
– Glenn Strycker
Nov 20 '14 at 22:03
I'm also looking to answer this question. Hive added analytic functions (includingrow_number()) in 0.11, and Spark 1.1 supports HiveQL / Hive 0.12. So it seems thatsqlContext.hql("select row_number() over(partition by ...should work, but I'm getting an error.
– dnlbrky
Nov 23 '14 at 3:52
This question is an extension of several other partially answered questions, namely stackoverflow.com/questions/23838614/…, qnalist.com/questions/5086896/…, mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/…, stackoverflow.com/questions/27022059/…, stackoverflow.com/questions/24677180/…
– Glenn Strycker
Nov 20 '14 at 22:03
This question is an extension of several other partially answered questions, namely stackoverflow.com/questions/23838614/…, qnalist.com/questions/5086896/…, mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/…, stackoverflow.com/questions/27022059/…, stackoverflow.com/questions/24677180/…
– Glenn Strycker
Nov 20 '14 at 22:03
I'm also looking to answer this question. Hive added analytic functions (including
row_number()) in 0.11, and Spark 1.1 supports HiveQL / Hive 0.12. So it seems that sqlContext.hql("select row_number() over(partition by ... should work, but I'm getting an error.– dnlbrky
Nov 23 '14 at 3:52
I'm also looking to answer this question. Hive added analytic functions (including
row_number()) in 0.11, and Spark 1.1 supports HiveQL / Hive 0.12. So it seems that sqlContext.hql("select row_number() over(partition by ... should work, but I'm getting an error.– dnlbrky
Nov 23 '14 at 3:52
add a comment |
4 Answers
4
active
oldest
votes
The row_number() over (partition by ... order by ...) functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.
Create a test DataFrame:
from pyspark.sql import Row, functions as F
testDF = sc.parallelize(
(Row(k="key1", v=(1,2,3)),
Row(k="key1", v=(1,4,7)),
Row(k="key1", v=(2,2,3)),
Row(k="key2", v=(5,5,5)),
Row(k="key2", v=(5,5,9)),
Row(k="key2", v=(7,5,5))
)
).toDF()
Add the partitioned row number:
from pyspark.sql.window import Window
(testDF
.select("k", "v",
F.rowNumber()
.over(Window
.partitionBy("k")
.orderBy("k")
)
.alias("rowNum")
)
.show()
)
+----+-------+------+
| k| v|rowNum|
+----+-------+------+
|key1|[1,2,3]| 1|
|key1|[1,4,7]| 2|
|key1|[2,2,3]| 3|
|key2|[5,5,5]| 1|
|key2|[5,5,9]| 2|
|key2|[7,5,5]| 3|
+----+-------+------+
add a comment |
This is an interesting problem you're bringing up. I will answer it in Python but I'm sure you will be able to translate seamlessly to Scala.
Here is how I would tackle it:
1- Simplify your data:
temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))
temp2 is now a "real" key-value pair. It looks like that:
[
((3, 4), (5, 5, 5)),
((3, 4), (5, 5, 9)),
((3, 4), (7, 5, 5)),
((1, 2), (1, 2, 3)),
((1, 2), (1, 4, 7)),
((1, 2), (2, 2, 3))
]
2- Then, use the group-by function to reproduce the effect of the PARTITION BY:
temp3 = temp2.groupByKey()
temp3 is now a RDD with 2 rows:
[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),
((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]
3- Now, you need to apply a rank function for each value of the RDD. In python, I would use the simple sorted function (the enumerate will create your row_number column):
temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)
Note that to implement your particular order, you would need to feed the right "key" argument (in python, I would just create a lambda function like those:
lambda tuple : (tuple[0],-tuple[1],tuple[2])
At the end (without the key argument function, it looks like that):
[
((1, 2), ((1, 2, 3), 0)),
((1, 2), ((1, 4, 7), 1)),
((1, 2), ((2, 2, 3), 2)),
((3, 4), ((5, 5, 5), 0)),
((3, 4), ((5, 5, 9), 1)),
((3, 4), ((7, 5, 5), 2))
]
Hope that helps!
Good luck.
add a comment |
val test = Seq(("key1", (1,2,3)),("key1",(4,5,6)), ("key2", (7,8,9)), ("key2", (0,1,2)))
test: Seq[(String, (Int, Int, Int))] = List((key1,(1,2,3)), (key1,(4,5,6)), (key2,(7,8,9)), (key2,(0,1,2)))
test.foreach(println)
(key1,(1,2,3))
(key1,(4,5,6))
(key2,(7,8,9))
(key2,(0,1,2))
val rdd = sc.parallelize(test, 2)
rdd: org.apache.spark.rdd.RDD[(String, (Int, Int, Int))] = ParallelCollectionRDD[41] at parallelize at :26
val rdd1 = rdd.groupByKey.map(x => (x._1,x._2.toArray)).map(x => (x._1, x._2.sortBy(x => x._1).zipWithIndex))
rdd1: org.apache.spark.rdd.RDD[(String, Array[((Int, Int, Int), Int)])] = MapPartitionsRDD[44] at map at :25
val rdd2 = rdd1.flatMap{
elem =>
val key = elem._1
elem._2.map(row => (key, row._1, row._2))
}
rdd2: org.apache.spark.rdd.RDD[(String, (Int, Int, Int), Int)] = MapPartitionsRDD[45] at flatMap at :25
rdd2.collect.foreach(println)
(key1,(1,2,3),0)
(key1,(4,5,6),1)
(key2,(0,1,2),0)
(key2,(7,8,9),1)
what's your point ?
– Wallace Huang
Aug 11 '18 at 15:47
add a comment |
From spark sql
Read the data files...
val df = spark.read.json("s3://s3bukcet/key/activity/year=2018/month=12/date=15/*");
The above file has fields user_id, pageviews and clicks
Generate the activity Id (row_number) partitioned by user_id and order by clicks
val output = df.withColumn("activity_id", functions.row_number().over(Window.partitionBy("user_id").orderBy("clicks")).cast(DataTypes.IntegerType));
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f27050247%2fhow-do-i-get-a-sql-row-number-equivalent-for-a-spark-rdd%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
4 Answers
4
active
oldest
votes
4 Answers
4
active
oldest
votes
active
oldest
votes
active
oldest
votes
The row_number() over (partition by ... order by ...) functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.
Create a test DataFrame:
from pyspark.sql import Row, functions as F
testDF = sc.parallelize(
(Row(k="key1", v=(1,2,3)),
Row(k="key1", v=(1,4,7)),
Row(k="key1", v=(2,2,3)),
Row(k="key2", v=(5,5,5)),
Row(k="key2", v=(5,5,9)),
Row(k="key2", v=(7,5,5))
)
).toDF()
Add the partitioned row number:
from pyspark.sql.window import Window
(testDF
.select("k", "v",
F.rowNumber()
.over(Window
.partitionBy("k")
.orderBy("k")
)
.alias("rowNum")
)
.show()
)
+----+-------+------+
| k| v|rowNum|
+----+-------+------+
|key1|[1,2,3]| 1|
|key1|[1,4,7]| 2|
|key1|[2,2,3]| 3|
|key2|[5,5,5]| 1|
|key2|[5,5,9]| 2|
|key2|[7,5,5]| 3|
+----+-------+------+
add a comment |
The row_number() over (partition by ... order by ...) functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.
Create a test DataFrame:
from pyspark.sql import Row, functions as F
testDF = sc.parallelize(
(Row(k="key1", v=(1,2,3)),
Row(k="key1", v=(1,4,7)),
Row(k="key1", v=(2,2,3)),
Row(k="key2", v=(5,5,5)),
Row(k="key2", v=(5,5,9)),
Row(k="key2", v=(7,5,5))
)
).toDF()
Add the partitioned row number:
from pyspark.sql.window import Window
(testDF
.select("k", "v",
F.rowNumber()
.over(Window
.partitionBy("k")
.orderBy("k")
)
.alias("rowNum")
)
.show()
)
+----+-------+------+
| k| v|rowNum|
+----+-------+------+
|key1|[1,2,3]| 1|
|key1|[1,4,7]| 2|
|key1|[2,2,3]| 3|
|key2|[5,5,5]| 1|
|key2|[5,5,9]| 2|
|key2|[7,5,5]| 3|
+----+-------+------+
add a comment |
The row_number() over (partition by ... order by ...) functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.
Create a test DataFrame:
from pyspark.sql import Row, functions as F
testDF = sc.parallelize(
(Row(k="key1", v=(1,2,3)),
Row(k="key1", v=(1,4,7)),
Row(k="key1", v=(2,2,3)),
Row(k="key2", v=(5,5,5)),
Row(k="key2", v=(5,5,9)),
Row(k="key2", v=(7,5,5))
)
).toDF()
Add the partitioned row number:
from pyspark.sql.window import Window
(testDF
.select("k", "v",
F.rowNumber()
.over(Window
.partitionBy("k")
.orderBy("k")
)
.alias("rowNum")
)
.show()
)
+----+-------+------+
| k| v|rowNum|
+----+-------+------+
|key1|[1,2,3]| 1|
|key1|[1,4,7]| 2|
|key1|[2,2,3]| 3|
|key2|[5,5,5]| 1|
|key2|[5,5,9]| 2|
|key2|[7,5,5]| 3|
+----+-------+------+
The row_number() over (partition by ... order by ...) functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.
Create a test DataFrame:
from pyspark.sql import Row, functions as F
testDF = sc.parallelize(
(Row(k="key1", v=(1,2,3)),
Row(k="key1", v=(1,4,7)),
Row(k="key1", v=(2,2,3)),
Row(k="key2", v=(5,5,5)),
Row(k="key2", v=(5,5,9)),
Row(k="key2", v=(7,5,5))
)
).toDF()
Add the partitioned row number:
from pyspark.sql.window import Window
(testDF
.select("k", "v",
F.rowNumber()
.over(Window
.partitionBy("k")
.orderBy("k")
)
.alias("rowNum")
)
.show()
)
+----+-------+------+
| k| v|rowNum|
+----+-------+------+
|key1|[1,2,3]| 1|
|key1|[1,4,7]| 2|
|key1|[2,2,3]| 3|
|key2|[5,5,5]| 1|
|key2|[5,5,9]| 2|
|key2|[7,5,5]| 3|
+----+-------+------+
answered Jun 26 '15 at 16:13
dnlbrky
5,05023652
5,05023652
add a comment |
add a comment |
This is an interesting problem you're bringing up. I will answer it in Python but I'm sure you will be able to translate seamlessly to Scala.
Here is how I would tackle it:
1- Simplify your data:
temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))
temp2 is now a "real" key-value pair. It looks like that:
[
((3, 4), (5, 5, 5)),
((3, 4), (5, 5, 9)),
((3, 4), (7, 5, 5)),
((1, 2), (1, 2, 3)),
((1, 2), (1, 4, 7)),
((1, 2), (2, 2, 3))
]
2- Then, use the group-by function to reproduce the effect of the PARTITION BY:
temp3 = temp2.groupByKey()
temp3 is now a RDD with 2 rows:
[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),
((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]
3- Now, you need to apply a rank function for each value of the RDD. In python, I would use the simple sorted function (the enumerate will create your row_number column):
temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)
Note that to implement your particular order, you would need to feed the right "key" argument (in python, I would just create a lambda function like those:
lambda tuple : (tuple[0],-tuple[1],tuple[2])
At the end (without the key argument function, it looks like that):
[
((1, 2), ((1, 2, 3), 0)),
((1, 2), ((1, 4, 7), 1)),
((1, 2), ((2, 2, 3), 2)),
((3, 4), ((5, 5, 5), 0)),
((3, 4), ((5, 5, 9), 1)),
((3, 4), ((7, 5, 5), 2))
]
Hope that helps!
Good luck.
add a comment |
This is an interesting problem you're bringing up. I will answer it in Python but I'm sure you will be able to translate seamlessly to Scala.
Here is how I would tackle it:
1- Simplify your data:
temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))
temp2 is now a "real" key-value pair. It looks like that:
[
((3, 4), (5, 5, 5)),
((3, 4), (5, 5, 9)),
((3, 4), (7, 5, 5)),
((1, 2), (1, 2, 3)),
((1, 2), (1, 4, 7)),
((1, 2), (2, 2, 3))
]
2- Then, use the group-by function to reproduce the effect of the PARTITION BY:
temp3 = temp2.groupByKey()
temp3 is now a RDD with 2 rows:
[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),
((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]
3- Now, you need to apply a rank function for each value of the RDD. In python, I would use the simple sorted function (the enumerate will create your row_number column):
temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)
Note that to implement your particular order, you would need to feed the right "key" argument (in python, I would just create a lambda function like those:
lambda tuple : (tuple[0],-tuple[1],tuple[2])
At the end (without the key argument function, it looks like that):
[
((1, 2), ((1, 2, 3), 0)),
((1, 2), ((1, 4, 7), 1)),
((1, 2), ((2, 2, 3), 2)),
((3, 4), ((5, 5, 5), 0)),
((3, 4), ((5, 5, 9), 1)),
((3, 4), ((7, 5, 5), 2))
]
Hope that helps!
Good luck.
add a comment |
This is an interesting problem you're bringing up. I will answer it in Python but I'm sure you will be able to translate seamlessly to Scala.
Here is how I would tackle it:
1- Simplify your data:
temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))
temp2 is now a "real" key-value pair. It looks like that:
[
((3, 4), (5, 5, 5)),
((3, 4), (5, 5, 9)),
((3, 4), (7, 5, 5)),
((1, 2), (1, 2, 3)),
((1, 2), (1, 4, 7)),
((1, 2), (2, 2, 3))
]
2- Then, use the group-by function to reproduce the effect of the PARTITION BY:
temp3 = temp2.groupByKey()
temp3 is now a RDD with 2 rows:
[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),
((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]
3- Now, you need to apply a rank function for each value of the RDD. In python, I would use the simple sorted function (the enumerate will create your row_number column):
temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)
Note that to implement your particular order, you would need to feed the right "key" argument (in python, I would just create a lambda function like those:
lambda tuple : (tuple[0],-tuple[1],tuple[2])
At the end (without the key argument function, it looks like that):
[
((1, 2), ((1, 2, 3), 0)),
((1, 2), ((1, 4, 7), 1)),
((1, 2), ((2, 2, 3), 2)),
((3, 4), ((5, 5, 5), 0)),
((3, 4), ((5, 5, 9), 1)),
((3, 4), ((7, 5, 5), 2))
]
Hope that helps!
Good luck.
This is an interesting problem you're bringing up. I will answer it in Python but I'm sure you will be able to translate seamlessly to Scala.
Here is how I would tackle it:
1- Simplify your data:
temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))
temp2 is now a "real" key-value pair. It looks like that:
[
((3, 4), (5, 5, 5)),
((3, 4), (5, 5, 9)),
((3, 4), (7, 5, 5)),
((1, 2), (1, 2, 3)),
((1, 2), (1, 4, 7)),
((1, 2), (2, 2, 3))
]
2- Then, use the group-by function to reproduce the effect of the PARTITION BY:
temp3 = temp2.groupByKey()
temp3 is now a RDD with 2 rows:
[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),
((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]
3- Now, you need to apply a rank function for each value of the RDD. In python, I would use the simple sorted function (the enumerate will create your row_number column):
temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)
Note that to implement your particular order, you would need to feed the right "key" argument (in python, I would just create a lambda function like those:
lambda tuple : (tuple[0],-tuple[1],tuple[2])
At the end (without the key argument function, it looks like that):
[
((1, 2), ((1, 2, 3), 0)),
((1, 2), ((1, 4, 7), 1)),
((1, 2), ((2, 2, 3), 2)),
((3, 4), ((5, 5, 5), 0)),
((3, 4), ((5, 5, 9), 1)),
((3, 4), ((7, 5, 5), 2))
]
Hope that helps!
Good luck.
answered Nov 21 '14 at 12:52
Guillaume G
766714
766714
add a comment |
add a comment |
val test = Seq(("key1", (1,2,3)),("key1",(4,5,6)), ("key2", (7,8,9)), ("key2", (0,1,2)))
test: Seq[(String, (Int, Int, Int))] = List((key1,(1,2,3)), (key1,(4,5,6)), (key2,(7,8,9)), (key2,(0,1,2)))
test.foreach(println)
(key1,(1,2,3))
(key1,(4,5,6))
(key2,(7,8,9))
(key2,(0,1,2))
val rdd = sc.parallelize(test, 2)
rdd: org.apache.spark.rdd.RDD[(String, (Int, Int, Int))] = ParallelCollectionRDD[41] at parallelize at :26
val rdd1 = rdd.groupByKey.map(x => (x._1,x._2.toArray)).map(x => (x._1, x._2.sortBy(x => x._1).zipWithIndex))
rdd1: org.apache.spark.rdd.RDD[(String, Array[((Int, Int, Int), Int)])] = MapPartitionsRDD[44] at map at :25
val rdd2 = rdd1.flatMap{
elem =>
val key = elem._1
elem._2.map(row => (key, row._1, row._2))
}
rdd2: org.apache.spark.rdd.RDD[(String, (Int, Int, Int), Int)] = MapPartitionsRDD[45] at flatMap at :25
rdd2.collect.foreach(println)
(key1,(1,2,3),0)
(key1,(4,5,6),1)
(key2,(0,1,2),0)
(key2,(7,8,9),1)
what's your point ?
– Wallace Huang
Aug 11 '18 at 15:47
add a comment |
val test = Seq(("key1", (1,2,3)),("key1",(4,5,6)), ("key2", (7,8,9)), ("key2", (0,1,2)))
test: Seq[(String, (Int, Int, Int))] = List((key1,(1,2,3)), (key1,(4,5,6)), (key2,(7,8,9)), (key2,(0,1,2)))
test.foreach(println)
(key1,(1,2,3))
(key1,(4,5,6))
(key2,(7,8,9))
(key2,(0,1,2))
val rdd = sc.parallelize(test, 2)
rdd: org.apache.spark.rdd.RDD[(String, (Int, Int, Int))] = ParallelCollectionRDD[41] at parallelize at :26
val rdd1 = rdd.groupByKey.map(x => (x._1,x._2.toArray)).map(x => (x._1, x._2.sortBy(x => x._1).zipWithIndex))
rdd1: org.apache.spark.rdd.RDD[(String, Array[((Int, Int, Int), Int)])] = MapPartitionsRDD[44] at map at :25
val rdd2 = rdd1.flatMap{
elem =>
val key = elem._1
elem._2.map(row => (key, row._1, row._2))
}
rdd2: org.apache.spark.rdd.RDD[(String, (Int, Int, Int), Int)] = MapPartitionsRDD[45] at flatMap at :25
rdd2.collect.foreach(println)
(key1,(1,2,3),0)
(key1,(4,5,6),1)
(key2,(0,1,2),0)
(key2,(7,8,9),1)
what's your point ?
– Wallace Huang
Aug 11 '18 at 15:47
add a comment |
val test = Seq(("key1", (1,2,3)),("key1",(4,5,6)), ("key2", (7,8,9)), ("key2", (0,1,2)))
test: Seq[(String, (Int, Int, Int))] = List((key1,(1,2,3)), (key1,(4,5,6)), (key2,(7,8,9)), (key2,(0,1,2)))
test.foreach(println)
(key1,(1,2,3))
(key1,(4,5,6))
(key2,(7,8,9))
(key2,(0,1,2))
val rdd = sc.parallelize(test, 2)
rdd: org.apache.spark.rdd.RDD[(String, (Int, Int, Int))] = ParallelCollectionRDD[41] at parallelize at :26
val rdd1 = rdd.groupByKey.map(x => (x._1,x._2.toArray)).map(x => (x._1, x._2.sortBy(x => x._1).zipWithIndex))
rdd1: org.apache.spark.rdd.RDD[(String, Array[((Int, Int, Int), Int)])] = MapPartitionsRDD[44] at map at :25
val rdd2 = rdd1.flatMap{
elem =>
val key = elem._1
elem._2.map(row => (key, row._1, row._2))
}
rdd2: org.apache.spark.rdd.RDD[(String, (Int, Int, Int), Int)] = MapPartitionsRDD[45] at flatMap at :25
rdd2.collect.foreach(println)
(key1,(1,2,3),0)
(key1,(4,5,6),1)
(key2,(0,1,2),0)
(key2,(7,8,9),1)
val test = Seq(("key1", (1,2,3)),("key1",(4,5,6)), ("key2", (7,8,9)), ("key2", (0,1,2)))
test: Seq[(String, (Int, Int, Int))] = List((key1,(1,2,3)), (key1,(4,5,6)), (key2,(7,8,9)), (key2,(0,1,2)))
test.foreach(println)
(key1,(1,2,3))
(key1,(4,5,6))
(key2,(7,8,9))
(key2,(0,1,2))
val rdd = sc.parallelize(test, 2)
rdd: org.apache.spark.rdd.RDD[(String, (Int, Int, Int))] = ParallelCollectionRDD[41] at parallelize at :26
val rdd1 = rdd.groupByKey.map(x => (x._1,x._2.toArray)).map(x => (x._1, x._2.sortBy(x => x._1).zipWithIndex))
rdd1: org.apache.spark.rdd.RDD[(String, Array[((Int, Int, Int), Int)])] = MapPartitionsRDD[44] at map at :25
val rdd2 = rdd1.flatMap{
elem =>
val key = elem._1
elem._2.map(row => (key, row._1, row._2))
}
rdd2: org.apache.spark.rdd.RDD[(String, (Int, Int, Int), Int)] = MapPartitionsRDD[45] at flatMap at :25
rdd2.collect.foreach(println)
(key1,(1,2,3),0)
(key1,(4,5,6),1)
(key2,(0,1,2),0)
(key2,(7,8,9),1)
answered Aug 2 '18 at 15:50
Wallace Huang
112
112
what's your point ?
– Wallace Huang
Aug 11 '18 at 15:47
add a comment |
what's your point ?
– Wallace Huang
Aug 11 '18 at 15:47
what's your point ?
– Wallace Huang
Aug 11 '18 at 15:47
what's your point ?
– Wallace Huang
Aug 11 '18 at 15:47
add a comment |
From spark sql
Read the data files...
val df = spark.read.json("s3://s3bukcet/key/activity/year=2018/month=12/date=15/*");
The above file has fields user_id, pageviews and clicks
Generate the activity Id (row_number) partitioned by user_id and order by clicks
val output = df.withColumn("activity_id", functions.row_number().over(Window.partitionBy("user_id").orderBy("clicks")).cast(DataTypes.IntegerType));
add a comment |
From spark sql
Read the data files...
val df = spark.read.json("s3://s3bukcet/key/activity/year=2018/month=12/date=15/*");
The above file has fields user_id, pageviews and clicks
Generate the activity Id (row_number) partitioned by user_id and order by clicks
val output = df.withColumn("activity_id", functions.row_number().over(Window.partitionBy("user_id").orderBy("clicks")).cast(DataTypes.IntegerType));
add a comment |
From spark sql
Read the data files...
val df = spark.read.json("s3://s3bukcet/key/activity/year=2018/month=12/date=15/*");
The above file has fields user_id, pageviews and clicks
Generate the activity Id (row_number) partitioned by user_id and order by clicks
val output = df.withColumn("activity_id", functions.row_number().over(Window.partitionBy("user_id").orderBy("clicks")).cast(DataTypes.IntegerType));
From spark sql
Read the data files...
val df = spark.read.json("s3://s3bukcet/key/activity/year=2018/month=12/date=15/*");
The above file has fields user_id, pageviews and clicks
Generate the activity Id (row_number) partitioned by user_id and order by clicks
val output = df.withColumn("activity_id", functions.row_number().over(Window.partitionBy("user_id").orderBy("clicks")).cast(DataTypes.IntegerType));
answered Dec 27 '18 at 20:03
Dakshin
415
415
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f27050247%2fhow-do-i-get-a-sql-row-number-equivalent-for-a-spark-rdd%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
This question is an extension of several other partially answered questions, namely stackoverflow.com/questions/23838614/…, qnalist.com/questions/5086896/…, mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/…, stackoverflow.com/questions/27022059/…, stackoverflow.com/questions/24677180/…
– Glenn Strycker
Nov 20 '14 at 22:03
I'm also looking to answer this question. Hive added analytic functions (including
row_number()) in 0.11, and Spark 1.1 supports HiveQL / Hive 0.12. So it seems thatsqlContext.hql("select row_number() over(partition by ...should work, but I'm getting an error.– dnlbrky
Nov 23 '14 at 3:52