How do I get a SQL row_number equivalent for a Spark RDD?












22














I need to generate a full list of row_numbers for a data table with many columns.



In SQL, this would look like this:



select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;


Now, let's say in Spark I have an RDD of the form (K, V), where V=(col1, col2, col3), so my entries are like



(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.


I want to order these using commands like sortBy(), sortWith(), sortByKey(), zipWithIndex, etc. and have a new RDD with the correct row_number



(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.


(I don't care about the parentheses, so the form can also be (K, (col1,col2,col3,rownum)) instead)



How do I do this?



Here's my first attempt:



val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))

val temp1 = sc.parallelize(sample_data)

temp1.collect().foreach(println)

// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)

temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)

// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)

// note that this isn't ordering with a partition on key value K!

val temp2 = temp1.???


Also note that the function sortBy cannot be applied directly to an RDD, but one must run collect() first, and then the output isn't an RDD, either, but an array



temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)

// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)


Here's a little more progress, but still not partitioned:



val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))

temp2.collect().foreach(println)

// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)









share|improve this question
























  • This question is an extension of several other partially answered questions, namely stackoverflow.com/questions/23838614/…, qnalist.com/questions/5086896/…, mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/…, stackoverflow.com/questions/27022059/…, stackoverflow.com/questions/24677180/…
    – Glenn Strycker
    Nov 20 '14 at 22:03










  • I'm also looking to answer this question. Hive added analytic functions (including row_number()) in 0.11, and Spark 1.1 supports HiveQL / Hive 0.12. So it seems that sqlContext.hql("select row_number() over(partition by ... should work, but I'm getting an error.
    – dnlbrky
    Nov 23 '14 at 3:52


















22














I need to generate a full list of row_numbers for a data table with many columns.



In SQL, this would look like this:



select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;


Now, let's say in Spark I have an RDD of the form (K, V), where V=(col1, col2, col3), so my entries are like



(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.


I want to order these using commands like sortBy(), sortWith(), sortByKey(), zipWithIndex, etc. and have a new RDD with the correct row_number



(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.


(I don't care about the parentheses, so the form can also be (K, (col1,col2,col3,rownum)) instead)



How do I do this?



Here's my first attempt:



val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))

val temp1 = sc.parallelize(sample_data)

temp1.collect().foreach(println)

// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)

temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)

// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)

// note that this isn't ordering with a partition on key value K!

val temp2 = temp1.???


Also note that the function sortBy cannot be applied directly to an RDD, but one must run collect() first, and then the output isn't an RDD, either, but an array



temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)

// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)


Here's a little more progress, but still not partitioned:



val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))

temp2.collect().foreach(println)

// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)









share|improve this question
























  • This question is an extension of several other partially answered questions, namely stackoverflow.com/questions/23838614/…, qnalist.com/questions/5086896/…, mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/…, stackoverflow.com/questions/27022059/…, stackoverflow.com/questions/24677180/…
    – Glenn Strycker
    Nov 20 '14 at 22:03










  • I'm also looking to answer this question. Hive added analytic functions (including row_number()) in 0.11, and Spark 1.1 supports HiveQL / Hive 0.12. So it seems that sqlContext.hql("select row_number() over(partition by ... should work, but I'm getting an error.
    – dnlbrky
    Nov 23 '14 at 3:52
















22












22








22


8





I need to generate a full list of row_numbers for a data table with many columns.



In SQL, this would look like this:



select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;


Now, let's say in Spark I have an RDD of the form (K, V), where V=(col1, col2, col3), so my entries are like



(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.


I want to order these using commands like sortBy(), sortWith(), sortByKey(), zipWithIndex, etc. and have a new RDD with the correct row_number



(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.


(I don't care about the parentheses, so the form can also be (K, (col1,col2,col3,rownum)) instead)



How do I do this?



Here's my first attempt:



val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))

val temp1 = sc.parallelize(sample_data)

temp1.collect().foreach(println)

// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)

temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)

// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)

// note that this isn't ordering with a partition on key value K!

val temp2 = temp1.???


Also note that the function sortBy cannot be applied directly to an RDD, but one must run collect() first, and then the output isn't an RDD, either, but an array



temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)

// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)


Here's a little more progress, but still not partitioned:



val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))

temp2.collect().foreach(println)

// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)









share|improve this question















I need to generate a full list of row_numbers for a data table with many columns.



In SQL, this would look like this:



select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;


Now, let's say in Spark I have an RDD of the form (K, V), where V=(col1, col2, col3), so my entries are like



(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.


I want to order these using commands like sortBy(), sortWith(), sortByKey(), zipWithIndex, etc. and have a new RDD with the correct row_number



(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.


(I don't care about the parentheses, so the form can also be (K, (col1,col2,col3,rownum)) instead)



How do I do this?



Here's my first attempt:



val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))

val temp1 = sc.parallelize(sample_data)

temp1.collect().foreach(println)

// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)

temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)

// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)

// note that this isn't ordering with a partition on key value K!

val temp2 = temp1.???


Also note that the function sortBy cannot be applied directly to an RDD, but one must run collect() first, and then the output isn't an RDD, either, but an array



temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)

// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)


Here's a little more progress, but still not partitioned:



val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))

temp2.collect().foreach(println)

// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)






sql apache-spark row-number rdd






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Sep 8 '17 at 21:16









hi-zir

19.4k62663




19.4k62663










asked Nov 20 '14 at 21:51









Glenn Strycker

2,19151740




2,19151740












  • This question is an extension of several other partially answered questions, namely stackoverflow.com/questions/23838614/…, qnalist.com/questions/5086896/…, mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/…, stackoverflow.com/questions/27022059/…, stackoverflow.com/questions/24677180/…
    – Glenn Strycker
    Nov 20 '14 at 22:03










  • I'm also looking to answer this question. Hive added analytic functions (including row_number()) in 0.11, and Spark 1.1 supports HiveQL / Hive 0.12. So it seems that sqlContext.hql("select row_number() over(partition by ... should work, but I'm getting an error.
    – dnlbrky
    Nov 23 '14 at 3:52




















  • This question is an extension of several other partially answered questions, namely stackoverflow.com/questions/23838614/…, qnalist.com/questions/5086896/…, mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/…, stackoverflow.com/questions/27022059/…, stackoverflow.com/questions/24677180/…
    – Glenn Strycker
    Nov 20 '14 at 22:03










  • I'm also looking to answer this question. Hive added analytic functions (including row_number()) in 0.11, and Spark 1.1 supports HiveQL / Hive 0.12. So it seems that sqlContext.hql("select row_number() over(partition by ... should work, but I'm getting an error.
    – dnlbrky
    Nov 23 '14 at 3:52


















This question is an extension of several other partially answered questions, namely stackoverflow.com/questions/23838614/…, qnalist.com/questions/5086896/…, mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/…, stackoverflow.com/questions/27022059/…, stackoverflow.com/questions/24677180/…
– Glenn Strycker
Nov 20 '14 at 22:03




This question is an extension of several other partially answered questions, namely stackoverflow.com/questions/23838614/…, qnalist.com/questions/5086896/…, mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/…, stackoverflow.com/questions/27022059/…, stackoverflow.com/questions/24677180/…
– Glenn Strycker
Nov 20 '14 at 22:03












I'm also looking to answer this question. Hive added analytic functions (including row_number()) in 0.11, and Spark 1.1 supports HiveQL / Hive 0.12. So it seems that sqlContext.hql("select row_number() over(partition by ... should work, but I'm getting an error.
– dnlbrky
Nov 23 '14 at 3:52






I'm also looking to answer this question. Hive added analytic functions (including row_number()) in 0.11, and Spark 1.1 supports HiveQL / Hive 0.12. So it seems that sqlContext.hql("select row_number() over(partition by ... should work, but I'm getting an error.
– dnlbrky
Nov 23 '14 at 3:52














4 Answers
4






active

oldest

votes


















19














The row_number() over (partition by ... order by ...) functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.



Create a test DataFrame:



from pyspark.sql import Row, functions as F

testDF = sc.parallelize(
(Row(k="key1", v=(1,2,3)),
Row(k="key1", v=(1,4,7)),
Row(k="key1", v=(2,2,3)),
Row(k="key2", v=(5,5,5)),
Row(k="key2", v=(5,5,9)),
Row(k="key2", v=(7,5,5))
)
).toDF()


Add the partitioned row number:



from pyspark.sql.window import Window

(testDF
.select("k", "v",
F.rowNumber()
.over(Window
.partitionBy("k")
.orderBy("k")
)
.alias("rowNum")
)
.show()
)

+----+-------+------+
| k| v|rowNum|
+----+-------+------+
|key1|[1,2,3]| 1|
|key1|[1,4,7]| 2|
|key1|[2,2,3]| 3|
|key2|[5,5,5]| 1|
|key2|[5,5,9]| 2|
|key2|[7,5,5]| 3|
+----+-------+------+





share|improve this answer





























    4














    This is an interesting problem you're bringing up. I will answer it in Python but I'm sure you will be able to translate seamlessly to Scala.



    Here is how I would tackle it:



    1- Simplify your data:



    temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))


    temp2 is now a "real" key-value pair. It looks like that:



    [
    ((3, 4), (5, 5, 5)),
    ((3, 4), (5, 5, 9)),
    ((3, 4), (7, 5, 5)),
    ((1, 2), (1, 2, 3)),
    ((1, 2), (1, 4, 7)),
    ((1, 2), (2, 2, 3))


    ]



    2- Then, use the group-by function to reproduce the effect of the PARTITION BY:



    temp3 = temp2.groupByKey()


    temp3 is now a RDD with 2 rows:



    [((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),  
    ((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]


    3- Now, you need to apply a rank function for each value of the RDD. In python, I would use the simple sorted function (the enumerate will create your row_number column):



     temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)


    Note that to implement your particular order, you would need to feed the right "key" argument (in python, I would just create a lambda function like those:



    lambda tuple : (tuple[0],-tuple[1],tuple[2])


    At the end (without the key argument function, it looks like that):



    [
    ((1, 2), ((1, 2, 3), 0)),
    ((1, 2), ((1, 4, 7), 1)),
    ((1, 2), ((2, 2, 3), 2)),
    ((3, 4), ((5, 5, 5), 0)),
    ((3, 4), ((5, 5, 9), 1)),
    ((3, 4), ((7, 5, 5), 2))


    ]



    Hope that helps!



    Good luck.






    share|improve this answer





























      1














      val test = Seq(("key1", (1,2,3)),("key1",(4,5,6)), ("key2", (7,8,9)), ("key2", (0,1,2)))


      test: Seq[(String, (Int, Int, Int))] = List((key1,(1,2,3)), (key1,(4,5,6)), (key2,(7,8,9)), (key2,(0,1,2)))



      test.foreach(println)


      (key1,(1,2,3))



      (key1,(4,5,6))



      (key2,(7,8,9))



      (key2,(0,1,2))



      val rdd = sc.parallelize(test, 2)


      rdd: org.apache.spark.rdd.RDD[(String, (Int, Int, Int))] = ParallelCollectionRDD[41] at parallelize at :26



      val rdd1 = rdd.groupByKey.map(x => (x._1,x._2.toArray)).map(x => (x._1, x._2.sortBy(x => x._1).zipWithIndex))


      rdd1: org.apache.spark.rdd.RDD[(String, Array[((Int, Int, Int), Int)])] = MapPartitionsRDD[44] at map at :25



      val rdd2 = rdd1.flatMap{ 
      elem =>
      val key = elem._1
      elem._2.map(row => (key, row._1, row._2))
      }


      rdd2: org.apache.spark.rdd.RDD[(String, (Int, Int, Int), Int)] = MapPartitionsRDD[45] at flatMap at :25



      rdd2.collect.foreach(println)


      (key1,(1,2,3),0)



      (key1,(4,5,6),1)



      (key2,(0,1,2),0)



      (key2,(7,8,9),1)






      share|improve this answer





















      • what's your point ?
        – Wallace Huang
        Aug 11 '18 at 15:47



















      0














      From spark sql

      Read the data files...

      val df = spark.read.json("s3://s3bukcet/key/activity/year=2018/month=12/date=15/*");



      The above file has fields user_id, pageviews and clicks



      Generate the activity Id (row_number) partitioned by user_id and order by clicks

      val output = df.withColumn("activity_id", functions.row_number().over(Window.partitionBy("user_id").orderBy("clicks")).cast(DataTypes.IntegerType));






      share|improve this answer





















        Your Answer






        StackExchange.ifUsing("editor", function () {
        StackExchange.using("externalEditor", function () {
        StackExchange.using("snippets", function () {
        StackExchange.snippets.init();
        });
        });
        }, "code-snippets");

        StackExchange.ready(function() {
        var channelOptions = {
        tags: "".split(" "),
        id: "1"
        };
        initTagRenderer("".split(" "), "".split(" "), channelOptions);

        StackExchange.using("externalEditor", function() {
        // Have to fire editor after snippets, if snippets enabled
        if (StackExchange.settings.snippets.snippetsEnabled) {
        StackExchange.using("snippets", function() {
        createEditor();
        });
        }
        else {
        createEditor();
        }
        });

        function createEditor() {
        StackExchange.prepareEditor({
        heartbeatType: 'answer',
        autoActivateHeartbeat: false,
        convertImagesToLinks: true,
        noModals: true,
        showLowRepImageUploadWarning: true,
        reputationToPostImages: 10,
        bindNavPrevention: true,
        postfix: "",
        imageUploader: {
        brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
        contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
        allowUrls: true
        },
        onDemand: true,
        discardSelector: ".discard-answer"
        ,immediatelyShowMarkdownHelp:true
        });


        }
        });














        draft saved

        draft discarded


















        StackExchange.ready(
        function () {
        StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f27050247%2fhow-do-i-get-a-sql-row-number-equivalent-for-a-spark-rdd%23new-answer', 'question_page');
        }
        );

        Post as a guest















        Required, but never shown

























        4 Answers
        4






        active

        oldest

        votes








        4 Answers
        4






        active

        oldest

        votes









        active

        oldest

        votes






        active

        oldest

        votes









        19














        The row_number() over (partition by ... order by ...) functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.



        Create a test DataFrame:



        from pyspark.sql import Row, functions as F

        testDF = sc.parallelize(
        (Row(k="key1", v=(1,2,3)),
        Row(k="key1", v=(1,4,7)),
        Row(k="key1", v=(2,2,3)),
        Row(k="key2", v=(5,5,5)),
        Row(k="key2", v=(5,5,9)),
        Row(k="key2", v=(7,5,5))
        )
        ).toDF()


        Add the partitioned row number:



        from pyspark.sql.window import Window

        (testDF
        .select("k", "v",
        F.rowNumber()
        .over(Window
        .partitionBy("k")
        .orderBy("k")
        )
        .alias("rowNum")
        )
        .show()
        )

        +----+-------+------+
        | k| v|rowNum|
        +----+-------+------+
        |key1|[1,2,3]| 1|
        |key1|[1,4,7]| 2|
        |key1|[2,2,3]| 3|
        |key2|[5,5,5]| 1|
        |key2|[5,5,9]| 2|
        |key2|[7,5,5]| 3|
        +----+-------+------+





        share|improve this answer


























          19














          The row_number() over (partition by ... order by ...) functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.



          Create a test DataFrame:



          from pyspark.sql import Row, functions as F

          testDF = sc.parallelize(
          (Row(k="key1", v=(1,2,3)),
          Row(k="key1", v=(1,4,7)),
          Row(k="key1", v=(2,2,3)),
          Row(k="key2", v=(5,5,5)),
          Row(k="key2", v=(5,5,9)),
          Row(k="key2", v=(7,5,5))
          )
          ).toDF()


          Add the partitioned row number:



          from pyspark.sql.window import Window

          (testDF
          .select("k", "v",
          F.rowNumber()
          .over(Window
          .partitionBy("k")
          .orderBy("k")
          )
          .alias("rowNum")
          )
          .show()
          )

          +----+-------+------+
          | k| v|rowNum|
          +----+-------+------+
          |key1|[1,2,3]| 1|
          |key1|[1,4,7]| 2|
          |key1|[2,2,3]| 3|
          |key2|[5,5,5]| 1|
          |key2|[5,5,9]| 2|
          |key2|[7,5,5]| 3|
          +----+-------+------+





          share|improve this answer
























            19












            19








            19






            The row_number() over (partition by ... order by ...) functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.



            Create a test DataFrame:



            from pyspark.sql import Row, functions as F

            testDF = sc.parallelize(
            (Row(k="key1", v=(1,2,3)),
            Row(k="key1", v=(1,4,7)),
            Row(k="key1", v=(2,2,3)),
            Row(k="key2", v=(5,5,5)),
            Row(k="key2", v=(5,5,9)),
            Row(k="key2", v=(7,5,5))
            )
            ).toDF()


            Add the partitioned row number:



            from pyspark.sql.window import Window

            (testDF
            .select("k", "v",
            F.rowNumber()
            .over(Window
            .partitionBy("k")
            .orderBy("k")
            )
            .alias("rowNum")
            )
            .show()
            )

            +----+-------+------+
            | k| v|rowNum|
            +----+-------+------+
            |key1|[1,2,3]| 1|
            |key1|[1,4,7]| 2|
            |key1|[2,2,3]| 3|
            |key2|[5,5,5]| 1|
            |key2|[5,5,9]| 2|
            |key2|[7,5,5]| 3|
            +----+-------+------+





            share|improve this answer












            The row_number() over (partition by ... order by ...) functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.



            Create a test DataFrame:



            from pyspark.sql import Row, functions as F

            testDF = sc.parallelize(
            (Row(k="key1", v=(1,2,3)),
            Row(k="key1", v=(1,4,7)),
            Row(k="key1", v=(2,2,3)),
            Row(k="key2", v=(5,5,5)),
            Row(k="key2", v=(5,5,9)),
            Row(k="key2", v=(7,5,5))
            )
            ).toDF()


            Add the partitioned row number:



            from pyspark.sql.window import Window

            (testDF
            .select("k", "v",
            F.rowNumber()
            .over(Window
            .partitionBy("k")
            .orderBy("k")
            )
            .alias("rowNum")
            )
            .show()
            )

            +----+-------+------+
            | k| v|rowNum|
            +----+-------+------+
            |key1|[1,2,3]| 1|
            |key1|[1,4,7]| 2|
            |key1|[2,2,3]| 3|
            |key2|[5,5,5]| 1|
            |key2|[5,5,9]| 2|
            |key2|[7,5,5]| 3|
            +----+-------+------+






            share|improve this answer












            share|improve this answer



            share|improve this answer










            answered Jun 26 '15 at 16:13









            dnlbrky

            5,05023652




            5,05023652

























                4














                This is an interesting problem you're bringing up. I will answer it in Python but I'm sure you will be able to translate seamlessly to Scala.



                Here is how I would tackle it:



                1- Simplify your data:



                temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))


                temp2 is now a "real" key-value pair. It looks like that:



                [
                ((3, 4), (5, 5, 5)),
                ((3, 4), (5, 5, 9)),
                ((3, 4), (7, 5, 5)),
                ((1, 2), (1, 2, 3)),
                ((1, 2), (1, 4, 7)),
                ((1, 2), (2, 2, 3))


                ]



                2- Then, use the group-by function to reproduce the effect of the PARTITION BY:



                temp3 = temp2.groupByKey()


                temp3 is now a RDD with 2 rows:



                [((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),  
                ((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]


                3- Now, you need to apply a rank function for each value of the RDD. In python, I would use the simple sorted function (the enumerate will create your row_number column):



                 temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)


                Note that to implement your particular order, you would need to feed the right "key" argument (in python, I would just create a lambda function like those:



                lambda tuple : (tuple[0],-tuple[1],tuple[2])


                At the end (without the key argument function, it looks like that):



                [
                ((1, 2), ((1, 2, 3), 0)),
                ((1, 2), ((1, 4, 7), 1)),
                ((1, 2), ((2, 2, 3), 2)),
                ((3, 4), ((5, 5, 5), 0)),
                ((3, 4), ((5, 5, 9), 1)),
                ((3, 4), ((7, 5, 5), 2))


                ]



                Hope that helps!



                Good luck.






                share|improve this answer


























                  4














                  This is an interesting problem you're bringing up. I will answer it in Python but I'm sure you will be able to translate seamlessly to Scala.



                  Here is how I would tackle it:



                  1- Simplify your data:



                  temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))


                  temp2 is now a "real" key-value pair. It looks like that:



                  [
                  ((3, 4), (5, 5, 5)),
                  ((3, 4), (5, 5, 9)),
                  ((3, 4), (7, 5, 5)),
                  ((1, 2), (1, 2, 3)),
                  ((1, 2), (1, 4, 7)),
                  ((1, 2), (2, 2, 3))


                  ]



                  2- Then, use the group-by function to reproduce the effect of the PARTITION BY:



                  temp3 = temp2.groupByKey()


                  temp3 is now a RDD with 2 rows:



                  [((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),  
                  ((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]


                  3- Now, you need to apply a rank function for each value of the RDD. In python, I would use the simple sorted function (the enumerate will create your row_number column):



                   temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)


                  Note that to implement your particular order, you would need to feed the right "key" argument (in python, I would just create a lambda function like those:



                  lambda tuple : (tuple[0],-tuple[1],tuple[2])


                  At the end (without the key argument function, it looks like that):



                  [
                  ((1, 2), ((1, 2, 3), 0)),
                  ((1, 2), ((1, 4, 7), 1)),
                  ((1, 2), ((2, 2, 3), 2)),
                  ((3, 4), ((5, 5, 5), 0)),
                  ((3, 4), ((5, 5, 9), 1)),
                  ((3, 4), ((7, 5, 5), 2))


                  ]



                  Hope that helps!



                  Good luck.






                  share|improve this answer
























                    4












                    4








                    4






                    This is an interesting problem you're bringing up. I will answer it in Python but I'm sure you will be able to translate seamlessly to Scala.



                    Here is how I would tackle it:



                    1- Simplify your data:



                    temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))


                    temp2 is now a "real" key-value pair. It looks like that:



                    [
                    ((3, 4), (5, 5, 5)),
                    ((3, 4), (5, 5, 9)),
                    ((3, 4), (7, 5, 5)),
                    ((1, 2), (1, 2, 3)),
                    ((1, 2), (1, 4, 7)),
                    ((1, 2), (2, 2, 3))


                    ]



                    2- Then, use the group-by function to reproduce the effect of the PARTITION BY:



                    temp3 = temp2.groupByKey()


                    temp3 is now a RDD with 2 rows:



                    [((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),  
                    ((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]


                    3- Now, you need to apply a rank function for each value of the RDD. In python, I would use the simple sorted function (the enumerate will create your row_number column):



                     temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)


                    Note that to implement your particular order, you would need to feed the right "key" argument (in python, I would just create a lambda function like those:



                    lambda tuple : (tuple[0],-tuple[1],tuple[2])


                    At the end (without the key argument function, it looks like that):



                    [
                    ((1, 2), ((1, 2, 3), 0)),
                    ((1, 2), ((1, 4, 7), 1)),
                    ((1, 2), ((2, 2, 3), 2)),
                    ((3, 4), ((5, 5, 5), 0)),
                    ((3, 4), ((5, 5, 9), 1)),
                    ((3, 4), ((7, 5, 5), 2))


                    ]



                    Hope that helps!



                    Good luck.






                    share|improve this answer












                    This is an interesting problem you're bringing up. I will answer it in Python but I'm sure you will be able to translate seamlessly to Scala.



                    Here is how I would tackle it:



                    1- Simplify your data:



                    temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))


                    temp2 is now a "real" key-value pair. It looks like that:



                    [
                    ((3, 4), (5, 5, 5)),
                    ((3, 4), (5, 5, 9)),
                    ((3, 4), (7, 5, 5)),
                    ((1, 2), (1, 2, 3)),
                    ((1, 2), (1, 4, 7)),
                    ((1, 2), (2, 2, 3))


                    ]



                    2- Then, use the group-by function to reproduce the effect of the PARTITION BY:



                    temp3 = temp2.groupByKey()


                    temp3 is now a RDD with 2 rows:



                    [((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),  
                    ((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]


                    3- Now, you need to apply a rank function for each value of the RDD. In python, I would use the simple sorted function (the enumerate will create your row_number column):



                     temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)


                    Note that to implement your particular order, you would need to feed the right "key" argument (in python, I would just create a lambda function like those:



                    lambda tuple : (tuple[0],-tuple[1],tuple[2])


                    At the end (without the key argument function, it looks like that):



                    [
                    ((1, 2), ((1, 2, 3), 0)),
                    ((1, 2), ((1, 4, 7), 1)),
                    ((1, 2), ((2, 2, 3), 2)),
                    ((3, 4), ((5, 5, 5), 0)),
                    ((3, 4), ((5, 5, 9), 1)),
                    ((3, 4), ((7, 5, 5), 2))


                    ]



                    Hope that helps!



                    Good luck.







                    share|improve this answer












                    share|improve this answer



                    share|improve this answer










                    answered Nov 21 '14 at 12:52









                    Guillaume G

                    766714




                    766714























                        1














                        val test = Seq(("key1", (1,2,3)),("key1",(4,5,6)), ("key2", (7,8,9)), ("key2", (0,1,2)))


                        test: Seq[(String, (Int, Int, Int))] = List((key1,(1,2,3)), (key1,(4,5,6)), (key2,(7,8,9)), (key2,(0,1,2)))



                        test.foreach(println)


                        (key1,(1,2,3))



                        (key1,(4,5,6))



                        (key2,(7,8,9))



                        (key2,(0,1,2))



                        val rdd = sc.parallelize(test, 2)


                        rdd: org.apache.spark.rdd.RDD[(String, (Int, Int, Int))] = ParallelCollectionRDD[41] at parallelize at :26



                        val rdd1 = rdd.groupByKey.map(x => (x._1,x._2.toArray)).map(x => (x._1, x._2.sortBy(x => x._1).zipWithIndex))


                        rdd1: org.apache.spark.rdd.RDD[(String, Array[((Int, Int, Int), Int)])] = MapPartitionsRDD[44] at map at :25



                        val rdd2 = rdd1.flatMap{ 
                        elem =>
                        val key = elem._1
                        elem._2.map(row => (key, row._1, row._2))
                        }


                        rdd2: org.apache.spark.rdd.RDD[(String, (Int, Int, Int), Int)] = MapPartitionsRDD[45] at flatMap at :25



                        rdd2.collect.foreach(println)


                        (key1,(1,2,3),0)



                        (key1,(4,5,6),1)



                        (key2,(0,1,2),0)



                        (key2,(7,8,9),1)






                        share|improve this answer





















                        • what's your point ?
                          – Wallace Huang
                          Aug 11 '18 at 15:47
















                        1














                        val test = Seq(("key1", (1,2,3)),("key1",(4,5,6)), ("key2", (7,8,9)), ("key2", (0,1,2)))


                        test: Seq[(String, (Int, Int, Int))] = List((key1,(1,2,3)), (key1,(4,5,6)), (key2,(7,8,9)), (key2,(0,1,2)))



                        test.foreach(println)


                        (key1,(1,2,3))



                        (key1,(4,5,6))



                        (key2,(7,8,9))



                        (key2,(0,1,2))



                        val rdd = sc.parallelize(test, 2)


                        rdd: org.apache.spark.rdd.RDD[(String, (Int, Int, Int))] = ParallelCollectionRDD[41] at parallelize at :26



                        val rdd1 = rdd.groupByKey.map(x => (x._1,x._2.toArray)).map(x => (x._1, x._2.sortBy(x => x._1).zipWithIndex))


                        rdd1: org.apache.spark.rdd.RDD[(String, Array[((Int, Int, Int), Int)])] = MapPartitionsRDD[44] at map at :25



                        val rdd2 = rdd1.flatMap{ 
                        elem =>
                        val key = elem._1
                        elem._2.map(row => (key, row._1, row._2))
                        }


                        rdd2: org.apache.spark.rdd.RDD[(String, (Int, Int, Int), Int)] = MapPartitionsRDD[45] at flatMap at :25



                        rdd2.collect.foreach(println)


                        (key1,(1,2,3),0)



                        (key1,(4,5,6),1)



                        (key2,(0,1,2),0)



                        (key2,(7,8,9),1)






                        share|improve this answer





















                        • what's your point ?
                          – Wallace Huang
                          Aug 11 '18 at 15:47














                        1












                        1








                        1






                        val test = Seq(("key1", (1,2,3)),("key1",(4,5,6)), ("key2", (7,8,9)), ("key2", (0,1,2)))


                        test: Seq[(String, (Int, Int, Int))] = List((key1,(1,2,3)), (key1,(4,5,6)), (key2,(7,8,9)), (key2,(0,1,2)))



                        test.foreach(println)


                        (key1,(1,2,3))



                        (key1,(4,5,6))



                        (key2,(7,8,9))



                        (key2,(0,1,2))



                        val rdd = sc.parallelize(test, 2)


                        rdd: org.apache.spark.rdd.RDD[(String, (Int, Int, Int))] = ParallelCollectionRDD[41] at parallelize at :26



                        val rdd1 = rdd.groupByKey.map(x => (x._1,x._2.toArray)).map(x => (x._1, x._2.sortBy(x => x._1).zipWithIndex))


                        rdd1: org.apache.spark.rdd.RDD[(String, Array[((Int, Int, Int), Int)])] = MapPartitionsRDD[44] at map at :25



                        val rdd2 = rdd1.flatMap{ 
                        elem =>
                        val key = elem._1
                        elem._2.map(row => (key, row._1, row._2))
                        }


                        rdd2: org.apache.spark.rdd.RDD[(String, (Int, Int, Int), Int)] = MapPartitionsRDD[45] at flatMap at :25



                        rdd2.collect.foreach(println)


                        (key1,(1,2,3),0)



                        (key1,(4,5,6),1)



                        (key2,(0,1,2),0)



                        (key2,(7,8,9),1)






                        share|improve this answer












                        val test = Seq(("key1", (1,2,3)),("key1",(4,5,6)), ("key2", (7,8,9)), ("key2", (0,1,2)))


                        test: Seq[(String, (Int, Int, Int))] = List((key1,(1,2,3)), (key1,(4,5,6)), (key2,(7,8,9)), (key2,(0,1,2)))



                        test.foreach(println)


                        (key1,(1,2,3))



                        (key1,(4,5,6))



                        (key2,(7,8,9))



                        (key2,(0,1,2))



                        val rdd = sc.parallelize(test, 2)


                        rdd: org.apache.spark.rdd.RDD[(String, (Int, Int, Int))] = ParallelCollectionRDD[41] at parallelize at :26



                        val rdd1 = rdd.groupByKey.map(x => (x._1,x._2.toArray)).map(x => (x._1, x._2.sortBy(x => x._1).zipWithIndex))


                        rdd1: org.apache.spark.rdd.RDD[(String, Array[((Int, Int, Int), Int)])] = MapPartitionsRDD[44] at map at :25



                        val rdd2 = rdd1.flatMap{ 
                        elem =>
                        val key = elem._1
                        elem._2.map(row => (key, row._1, row._2))
                        }


                        rdd2: org.apache.spark.rdd.RDD[(String, (Int, Int, Int), Int)] = MapPartitionsRDD[45] at flatMap at :25



                        rdd2.collect.foreach(println)


                        (key1,(1,2,3),0)



                        (key1,(4,5,6),1)



                        (key2,(0,1,2),0)



                        (key2,(7,8,9),1)







                        share|improve this answer












                        share|improve this answer



                        share|improve this answer










                        answered Aug 2 '18 at 15:50









                        Wallace Huang

                        112




                        112












                        • what's your point ?
                          – Wallace Huang
                          Aug 11 '18 at 15:47


















                        • what's your point ?
                          – Wallace Huang
                          Aug 11 '18 at 15:47
















                        what's your point ?
                        – Wallace Huang
                        Aug 11 '18 at 15:47




                        what's your point ?
                        – Wallace Huang
                        Aug 11 '18 at 15:47











                        0














                        From spark sql

                        Read the data files...

                        val df = spark.read.json("s3://s3bukcet/key/activity/year=2018/month=12/date=15/*");



                        The above file has fields user_id, pageviews and clicks



                        Generate the activity Id (row_number) partitioned by user_id and order by clicks

                        val output = df.withColumn("activity_id", functions.row_number().over(Window.partitionBy("user_id").orderBy("clicks")).cast(DataTypes.IntegerType));






                        share|improve this answer


























                          0














                          From spark sql

                          Read the data files...

                          val df = spark.read.json("s3://s3bukcet/key/activity/year=2018/month=12/date=15/*");



                          The above file has fields user_id, pageviews and clicks



                          Generate the activity Id (row_number) partitioned by user_id and order by clicks

                          val output = df.withColumn("activity_id", functions.row_number().over(Window.partitionBy("user_id").orderBy("clicks")).cast(DataTypes.IntegerType));






                          share|improve this answer
























                            0












                            0








                            0






                            From spark sql

                            Read the data files...

                            val df = spark.read.json("s3://s3bukcet/key/activity/year=2018/month=12/date=15/*");



                            The above file has fields user_id, pageviews and clicks



                            Generate the activity Id (row_number) partitioned by user_id and order by clicks

                            val output = df.withColumn("activity_id", functions.row_number().over(Window.partitionBy("user_id").orderBy("clicks")).cast(DataTypes.IntegerType));






                            share|improve this answer












                            From spark sql

                            Read the data files...

                            val df = spark.read.json("s3://s3bukcet/key/activity/year=2018/month=12/date=15/*");



                            The above file has fields user_id, pageviews and clicks



                            Generate the activity Id (row_number) partitioned by user_id and order by clicks

                            val output = df.withColumn("activity_id", functions.row_number().over(Window.partitionBy("user_id").orderBy("clicks")).cast(DataTypes.IntegerType));







                            share|improve this answer












                            share|improve this answer



                            share|improve this answer










                            answered Dec 27 '18 at 20:03









                            Dakshin

                            415




                            415






























                                draft saved

                                draft discarded




















































                                Thanks for contributing an answer to Stack Overflow!


                                • Please be sure to answer the question. Provide details and share your research!

                                But avoid



                                • Asking for help, clarification, or responding to other answers.

                                • Making statements based on opinion; back them up with references or personal experience.


                                To learn more, see our tips on writing great answers.





                                Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


                                Please pay close attention to the following guidance:


                                • Please be sure to answer the question. Provide details and share your research!

                                But avoid



                                • Asking for help, clarification, or responding to other answers.

                                • Making statements based on opinion; back them up with references or personal experience.


                                To learn more, see our tips on writing great answers.




                                draft saved


                                draft discarded














                                StackExchange.ready(
                                function () {
                                StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f27050247%2fhow-do-i-get-a-sql-row-number-equivalent-for-a-spark-rdd%23new-answer', 'question_page');
                                }
                                );

                                Post as a guest















                                Required, but never shown





















































                                Required, but never shown














                                Required, but never shown












                                Required, but never shown







                                Required, but never shown

































                                Required, but never shown














                                Required, but never shown












                                Required, but never shown







                                Required, but never shown