Pyspark: Split multiple array columns into rows





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}







36















I have a dataframe which has one row, and several columns. Some of the columns are single values, and others are lists. All list columns are the same length. I want to split each list column into a separate row, while keeping any non-list column as is.



Sample DF:



from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode

sqlc = SQLContext(sc)

df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# | a| b| c| d|
# +---+---------+---------+---+
# | 1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+


What I want:



+---+---+----+------+
| a| b| c | d |
+---+---+----+------+
| 1| 1| 7 | foo |
| 1| 2| 8 | foo |
| 1| 3| 9 | foo |
+---+---+----+------+


If I only had one list column, this would be easy by just doing an explode:



df_exploded = df.withColumn('b', explode('b'))
# >>> df_exploded.show()
# +---+---+---------+---+
# | a| b| c| d|
# +---+---+---------+---+
# | 1| 1|[7, 8, 9]|foo|
# | 1| 2|[7, 8, 9]|foo|
# | 1| 3|[7, 8, 9]|foo|
# +---+---+---------+---+


However, if I try to also explode the c column, I end up with a dataframe with a length the square of what I want:



df_exploded_again = df_exploded.withColumn('c', explode('c'))
# >>> df_exploded_again.show()
# +---+---+---+---+
# | a| b| c| d|
# +---+---+---+---+
# | 1| 1| 7|foo|
# | 1| 1| 8|foo|
# | 1| 1| 9|foo|
# | 1| 2| 7|foo|
# | 1| 2| 8|foo|
# | 1| 2| 9|foo|
# | 1| 3| 7|foo|
# | 1| 3| 8|foo|
# | 1| 3| 9|foo|
# +---+---+---+---+


What I want is - for each column, take the nth element of the array in that column and add that to a new row. I've tried mapping an explode accross all columns in the dataframe, but that doesn't seem to work either:



df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()









share|improve this question































    36















    I have a dataframe which has one row, and several columns. Some of the columns are single values, and others are lists. All list columns are the same length. I want to split each list column into a separate row, while keeping any non-list column as is.



    Sample DF:



    from pyspark import Row
    from pyspark.sql import SQLContext
    from pyspark.sql.functions import explode

    sqlc = SQLContext(sc)

    df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
    # +---+---------+---------+---+
    # | a| b| c| d|
    # +---+---------+---------+---+
    # | 1|[1, 2, 3]|[7, 8, 9]|foo|
    # +---+---------+---------+---+


    What I want:



    +---+---+----+------+
    | a| b| c | d |
    +---+---+----+------+
    | 1| 1| 7 | foo |
    | 1| 2| 8 | foo |
    | 1| 3| 9 | foo |
    +---+---+----+------+


    If I only had one list column, this would be easy by just doing an explode:



    df_exploded = df.withColumn('b', explode('b'))
    # >>> df_exploded.show()
    # +---+---+---------+---+
    # | a| b| c| d|
    # +---+---+---------+---+
    # | 1| 1|[7, 8, 9]|foo|
    # | 1| 2|[7, 8, 9]|foo|
    # | 1| 3|[7, 8, 9]|foo|
    # +---+---+---------+---+


    However, if I try to also explode the c column, I end up with a dataframe with a length the square of what I want:



    df_exploded_again = df_exploded.withColumn('c', explode('c'))
    # >>> df_exploded_again.show()
    # +---+---+---+---+
    # | a| b| c| d|
    # +---+---+---+---+
    # | 1| 1| 7|foo|
    # | 1| 1| 8|foo|
    # | 1| 1| 9|foo|
    # | 1| 2| 7|foo|
    # | 1| 2| 8|foo|
    # | 1| 2| 9|foo|
    # | 1| 3| 7|foo|
    # | 1| 3| 8|foo|
    # | 1| 3| 9|foo|
    # +---+---+---+---+


    What I want is - for each column, take the nth element of the array in that column and add that to a new row. I've tried mapping an explode accross all columns in the dataframe, but that doesn't seem to work either:



    df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()









    share|improve this question



























      36












      36








      36


      22






      I have a dataframe which has one row, and several columns. Some of the columns are single values, and others are lists. All list columns are the same length. I want to split each list column into a separate row, while keeping any non-list column as is.



      Sample DF:



      from pyspark import Row
      from pyspark.sql import SQLContext
      from pyspark.sql.functions import explode

      sqlc = SQLContext(sc)

      df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
      # +---+---------+---------+---+
      # | a| b| c| d|
      # +---+---------+---------+---+
      # | 1|[1, 2, 3]|[7, 8, 9]|foo|
      # +---+---------+---------+---+


      What I want:



      +---+---+----+------+
      | a| b| c | d |
      +---+---+----+------+
      | 1| 1| 7 | foo |
      | 1| 2| 8 | foo |
      | 1| 3| 9 | foo |
      +---+---+----+------+


      If I only had one list column, this would be easy by just doing an explode:



      df_exploded = df.withColumn('b', explode('b'))
      # >>> df_exploded.show()
      # +---+---+---------+---+
      # | a| b| c| d|
      # +---+---+---------+---+
      # | 1| 1|[7, 8, 9]|foo|
      # | 1| 2|[7, 8, 9]|foo|
      # | 1| 3|[7, 8, 9]|foo|
      # +---+---+---------+---+


      However, if I try to also explode the c column, I end up with a dataframe with a length the square of what I want:



      df_exploded_again = df_exploded.withColumn('c', explode('c'))
      # >>> df_exploded_again.show()
      # +---+---+---+---+
      # | a| b| c| d|
      # +---+---+---+---+
      # | 1| 1| 7|foo|
      # | 1| 1| 8|foo|
      # | 1| 1| 9|foo|
      # | 1| 2| 7|foo|
      # | 1| 2| 8|foo|
      # | 1| 2| 9|foo|
      # | 1| 3| 7|foo|
      # | 1| 3| 8|foo|
      # | 1| 3| 9|foo|
      # +---+---+---+---+


      What I want is - for each column, take the nth element of the array in that column and add that to a new row. I've tried mapping an explode accross all columns in the dataframe, but that doesn't seem to work either:



      df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()









      share|improve this question
















      I have a dataframe which has one row, and several columns. Some of the columns are single values, and others are lists. All list columns are the same length. I want to split each list column into a separate row, while keeping any non-list column as is.



      Sample DF:



      from pyspark import Row
      from pyspark.sql import SQLContext
      from pyspark.sql.functions import explode

      sqlc = SQLContext(sc)

      df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
      # +---+---------+---------+---+
      # | a| b| c| d|
      # +---+---------+---------+---+
      # | 1|[1, 2, 3]|[7, 8, 9]|foo|
      # +---+---------+---------+---+


      What I want:



      +---+---+----+------+
      | a| b| c | d |
      +---+---+----+------+
      | 1| 1| 7 | foo |
      | 1| 2| 8 | foo |
      | 1| 3| 9 | foo |
      +---+---+----+------+


      If I only had one list column, this would be easy by just doing an explode:



      df_exploded = df.withColumn('b', explode('b'))
      # >>> df_exploded.show()
      # +---+---+---------+---+
      # | a| b| c| d|
      # +---+---+---------+---+
      # | 1| 1|[7, 8, 9]|foo|
      # | 1| 2|[7, 8, 9]|foo|
      # | 1| 3|[7, 8, 9]|foo|
      # +---+---+---------+---+


      However, if I try to also explode the c column, I end up with a dataframe with a length the square of what I want:



      df_exploded_again = df_exploded.withColumn('c', explode('c'))
      # >>> df_exploded_again.show()
      # +---+---+---+---+
      # | a| b| c| d|
      # +---+---+---+---+
      # | 1| 1| 7|foo|
      # | 1| 1| 8|foo|
      # | 1| 1| 9|foo|
      # | 1| 2| 7|foo|
      # | 1| 2| 8|foo|
      # | 1| 2| 9|foo|
      # | 1| 3| 7|foo|
      # | 1| 3| 8|foo|
      # | 1| 3| 9|foo|
      # +---+---+---+---+


      What I want is - for each column, take the nth element of the array in that column and add that to a new row. I've tried mapping an explode accross all columns in the dataframe, but that doesn't seem to work either:



      df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()






      python apache-spark dataframe pyspark apache-spark-sql






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Jan 7 at 5:54









      Keith Hughitt

      2,78352947




      2,78352947










      asked Dec 7 '16 at 21:02









      SteveSteve

      81131223




      81131223
























          2 Answers
          2






          active

          oldest

          votes


















          44














          Spark >= 2.4



          You can replace zip_ udf with arrays_zip function



          from pyspark.sql.functions import arrays_zip, col

          (df
          .withColumn("tmp", arrays_zip("b", "c"))
          .withColumn("tmp", explode("tmp"))
          .select("a", col("tmp.b"), col("tmp.c"), "d"))


          Spark < 2.4



          With DataFrames and UDF:



          from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
          from pyspark.sql.functions import col, udf, explode

          zip_ = udf(
          lambda x, y: list(zip(x, y)),
          ArrayType(StructType([
          # Adjust types to reflect data types
          StructField("first", IntegerType()),
          StructField("second", IntegerType())
          ]))
          )

          (df
          .withColumn("tmp", zip_("b", "c"))
          # UDF output cannot be directly passed to explode
          .withColumn("tmp", explode("tmp"))
          .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))


          With RDDs:



          (df
          .rdd
          .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
          .toDF(["a", "b", "c", "d"]))


          Both solutions are inefficient due to Python communication overhead. If data size is fixed you can do something like this:



          from functools import reduce
          from pyspark.sql import DataFrame

          # Length of array
          n = 3

          # For legacy Python you'll need a separate function
          # in place of method accessor
          reduce(
          DataFrame.unionAll,
          (df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
          for i in range(n))
          ).toDF("a", "b", "c", "d")


          or even:



          from pyspark.sql.functions import array, struct

          # SQL level zip of arrays of known size
          # followed by explode
          tmp = explode(array(*[
          struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
          for i in range(n)
          ]))

          (df
          .withColumn("tmp", tmp)
          .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))


          This should be significantly faster compared to UDF or RDD. Generalized to support an arbitrary number of columns:



          # This uses keyword only arguments
          # If you use legacy Python you'll have to change signature
          # Body of the function can stay the same
          def zip_and_explode(*colnames, n):
          return explode(array(*[
          struct(*[col(c).getItem(i).alias(c) for c in colnames])
          for i in range(n)
          ]))

          df.withColumn("tmp", zip_and_explode("b", "c", n=3))





          share|improve this answer

































            8














            You'd need to use flatMap, not map as you want to make multiple output rows out of each input row.



            from pyspark.sql import Row
            def dualExplode(r):
            rowDict = r.asDict()
            bList = rowDict.pop('b')
            cList = rowDict.pop('c')
            for b,c in zip(bList, cList):
            newDict = dict(rowDict)
            newDict['b'] = b
            newDict['c'] = c
            yield Row(**newDict)

            df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))





            share|improve this answer
























            • if the first df has 3 values and second df has 2 values, our zip happens to be returning two pairs instead of 3. Could you advice on it.

              – Dugini Vijay
              Aug 14 '18 at 17:14











            • Zip pairs together the first element of an obj with the 1st element of another object, 2nd with 2nd, etc until one of the objects runs out of elements. In your case, after 2 values. Said another way, it will pair up elements until there are no more items to pair. To give any suggestions, I'd need to know how you want your program to deal with the un-paired element (eg do you want a null from the 2nd set?). Also, there is only 1 df in this example. If your question is that different from this one, it's probably better to just ask another question

              – David
              Aug 14 '18 at 19:15













            • Thanks @David for your reply. I figured it out. Using Izip helped over to solve this issue. But still I appreciate your response mate.

              – Dugini Vijay
              Aug 15 '18 at 18:17












            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f41027315%2fpyspark-split-multiple-array-columns-into-rows%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            2 Answers
            2






            active

            oldest

            votes








            2 Answers
            2






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            44














            Spark >= 2.4



            You can replace zip_ udf with arrays_zip function



            from pyspark.sql.functions import arrays_zip, col

            (df
            .withColumn("tmp", arrays_zip("b", "c"))
            .withColumn("tmp", explode("tmp"))
            .select("a", col("tmp.b"), col("tmp.c"), "d"))


            Spark < 2.4



            With DataFrames and UDF:



            from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
            from pyspark.sql.functions import col, udf, explode

            zip_ = udf(
            lambda x, y: list(zip(x, y)),
            ArrayType(StructType([
            # Adjust types to reflect data types
            StructField("first", IntegerType()),
            StructField("second", IntegerType())
            ]))
            )

            (df
            .withColumn("tmp", zip_("b", "c"))
            # UDF output cannot be directly passed to explode
            .withColumn("tmp", explode("tmp"))
            .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))


            With RDDs:



            (df
            .rdd
            .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
            .toDF(["a", "b", "c", "d"]))


            Both solutions are inefficient due to Python communication overhead. If data size is fixed you can do something like this:



            from functools import reduce
            from pyspark.sql import DataFrame

            # Length of array
            n = 3

            # For legacy Python you'll need a separate function
            # in place of method accessor
            reduce(
            DataFrame.unionAll,
            (df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
            for i in range(n))
            ).toDF("a", "b", "c", "d")


            or even:



            from pyspark.sql.functions import array, struct

            # SQL level zip of arrays of known size
            # followed by explode
            tmp = explode(array(*[
            struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
            for i in range(n)
            ]))

            (df
            .withColumn("tmp", tmp)
            .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))


            This should be significantly faster compared to UDF or RDD. Generalized to support an arbitrary number of columns:



            # This uses keyword only arguments
            # If you use legacy Python you'll have to change signature
            # Body of the function can stay the same
            def zip_and_explode(*colnames, n):
            return explode(array(*[
            struct(*[col(c).getItem(i).alias(c) for c in colnames])
            for i in range(n)
            ]))

            df.withColumn("tmp", zip_and_explode("b", "c", n=3))





            share|improve this answer






























              44














              Spark >= 2.4



              You can replace zip_ udf with arrays_zip function



              from pyspark.sql.functions import arrays_zip, col

              (df
              .withColumn("tmp", arrays_zip("b", "c"))
              .withColumn("tmp", explode("tmp"))
              .select("a", col("tmp.b"), col("tmp.c"), "d"))


              Spark < 2.4



              With DataFrames and UDF:



              from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
              from pyspark.sql.functions import col, udf, explode

              zip_ = udf(
              lambda x, y: list(zip(x, y)),
              ArrayType(StructType([
              # Adjust types to reflect data types
              StructField("first", IntegerType()),
              StructField("second", IntegerType())
              ]))
              )

              (df
              .withColumn("tmp", zip_("b", "c"))
              # UDF output cannot be directly passed to explode
              .withColumn("tmp", explode("tmp"))
              .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))


              With RDDs:



              (df
              .rdd
              .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
              .toDF(["a", "b", "c", "d"]))


              Both solutions are inefficient due to Python communication overhead. If data size is fixed you can do something like this:



              from functools import reduce
              from pyspark.sql import DataFrame

              # Length of array
              n = 3

              # For legacy Python you'll need a separate function
              # in place of method accessor
              reduce(
              DataFrame.unionAll,
              (df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
              for i in range(n))
              ).toDF("a", "b", "c", "d")


              or even:



              from pyspark.sql.functions import array, struct

              # SQL level zip of arrays of known size
              # followed by explode
              tmp = explode(array(*[
              struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
              for i in range(n)
              ]))

              (df
              .withColumn("tmp", tmp)
              .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))


              This should be significantly faster compared to UDF or RDD. Generalized to support an arbitrary number of columns:



              # This uses keyword only arguments
              # If you use legacy Python you'll have to change signature
              # Body of the function can stay the same
              def zip_and_explode(*colnames, n):
              return explode(array(*[
              struct(*[col(c).getItem(i).alias(c) for c in colnames])
              for i in range(n)
              ]))

              df.withColumn("tmp", zip_and_explode("b", "c", n=3))





              share|improve this answer




























                44












                44








                44







                Spark >= 2.4



                You can replace zip_ udf with arrays_zip function



                from pyspark.sql.functions import arrays_zip, col

                (df
                .withColumn("tmp", arrays_zip("b", "c"))
                .withColumn("tmp", explode("tmp"))
                .select("a", col("tmp.b"), col("tmp.c"), "d"))


                Spark < 2.4



                With DataFrames and UDF:



                from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
                from pyspark.sql.functions import col, udf, explode

                zip_ = udf(
                lambda x, y: list(zip(x, y)),
                ArrayType(StructType([
                # Adjust types to reflect data types
                StructField("first", IntegerType()),
                StructField("second", IntegerType())
                ]))
                )

                (df
                .withColumn("tmp", zip_("b", "c"))
                # UDF output cannot be directly passed to explode
                .withColumn("tmp", explode("tmp"))
                .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))


                With RDDs:



                (df
                .rdd
                .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
                .toDF(["a", "b", "c", "d"]))


                Both solutions are inefficient due to Python communication overhead. If data size is fixed you can do something like this:



                from functools import reduce
                from pyspark.sql import DataFrame

                # Length of array
                n = 3

                # For legacy Python you'll need a separate function
                # in place of method accessor
                reduce(
                DataFrame.unionAll,
                (df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
                for i in range(n))
                ).toDF("a", "b", "c", "d")


                or even:



                from pyspark.sql.functions import array, struct

                # SQL level zip of arrays of known size
                # followed by explode
                tmp = explode(array(*[
                struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
                for i in range(n)
                ]))

                (df
                .withColumn("tmp", tmp)
                .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))


                This should be significantly faster compared to UDF or RDD. Generalized to support an arbitrary number of columns:



                # This uses keyword only arguments
                # If you use legacy Python you'll have to change signature
                # Body of the function can stay the same
                def zip_and_explode(*colnames, n):
                return explode(array(*[
                struct(*[col(c).getItem(i).alias(c) for c in colnames])
                for i in range(n)
                ]))

                df.withColumn("tmp", zip_and_explode("b", "c", n=3))





                share|improve this answer















                Spark >= 2.4



                You can replace zip_ udf with arrays_zip function



                from pyspark.sql.functions import arrays_zip, col

                (df
                .withColumn("tmp", arrays_zip("b", "c"))
                .withColumn("tmp", explode("tmp"))
                .select("a", col("tmp.b"), col("tmp.c"), "d"))


                Spark < 2.4



                With DataFrames and UDF:



                from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
                from pyspark.sql.functions import col, udf, explode

                zip_ = udf(
                lambda x, y: list(zip(x, y)),
                ArrayType(StructType([
                # Adjust types to reflect data types
                StructField("first", IntegerType()),
                StructField("second", IntegerType())
                ]))
                )

                (df
                .withColumn("tmp", zip_("b", "c"))
                # UDF output cannot be directly passed to explode
                .withColumn("tmp", explode("tmp"))
                .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))


                With RDDs:



                (df
                .rdd
                .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
                .toDF(["a", "b", "c", "d"]))


                Both solutions are inefficient due to Python communication overhead. If data size is fixed you can do something like this:



                from functools import reduce
                from pyspark.sql import DataFrame

                # Length of array
                n = 3

                # For legacy Python you'll need a separate function
                # in place of method accessor
                reduce(
                DataFrame.unionAll,
                (df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
                for i in range(n))
                ).toDF("a", "b", "c", "d")


                or even:



                from pyspark.sql.functions import array, struct

                # SQL level zip of arrays of known size
                # followed by explode
                tmp = explode(array(*[
                struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
                for i in range(n)
                ]))

                (df
                .withColumn("tmp", tmp)
                .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))


                This should be significantly faster compared to UDF or RDD. Generalized to support an arbitrary number of columns:



                # This uses keyword only arguments
                # If you use legacy Python you'll have to change signature
                # Body of the function can stay the same
                def zip_and_explode(*colnames, n):
                return explode(array(*[
                struct(*[col(c).getItem(i).alias(c) for c in colnames])
                for i in range(n)
                ]))

                df.withColumn("tmp", zip_and_explode("b", "c", n=3))






                share|improve this answer














                share|improve this answer



                share|improve this answer








                edited Jan 7 at 6:32









                Keith Hughitt

                2,78352947




                2,78352947










                answered Dec 7 '16 at 21:23









                user6910411user6910411

                35.8k1090111




                35.8k1090111

























                    8














                    You'd need to use flatMap, not map as you want to make multiple output rows out of each input row.



                    from pyspark.sql import Row
                    def dualExplode(r):
                    rowDict = r.asDict()
                    bList = rowDict.pop('b')
                    cList = rowDict.pop('c')
                    for b,c in zip(bList, cList):
                    newDict = dict(rowDict)
                    newDict['b'] = b
                    newDict['c'] = c
                    yield Row(**newDict)

                    df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))





                    share|improve this answer
























                    • if the first df has 3 values and second df has 2 values, our zip happens to be returning two pairs instead of 3. Could you advice on it.

                      – Dugini Vijay
                      Aug 14 '18 at 17:14











                    • Zip pairs together the first element of an obj with the 1st element of another object, 2nd with 2nd, etc until one of the objects runs out of elements. In your case, after 2 values. Said another way, it will pair up elements until there are no more items to pair. To give any suggestions, I'd need to know how you want your program to deal with the un-paired element (eg do you want a null from the 2nd set?). Also, there is only 1 df in this example. If your question is that different from this one, it's probably better to just ask another question

                      – David
                      Aug 14 '18 at 19:15













                    • Thanks @David for your reply. I figured it out. Using Izip helped over to solve this issue. But still I appreciate your response mate.

                      – Dugini Vijay
                      Aug 15 '18 at 18:17
















                    8














                    You'd need to use flatMap, not map as you want to make multiple output rows out of each input row.



                    from pyspark.sql import Row
                    def dualExplode(r):
                    rowDict = r.asDict()
                    bList = rowDict.pop('b')
                    cList = rowDict.pop('c')
                    for b,c in zip(bList, cList):
                    newDict = dict(rowDict)
                    newDict['b'] = b
                    newDict['c'] = c
                    yield Row(**newDict)

                    df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))





                    share|improve this answer
























                    • if the first df has 3 values and second df has 2 values, our zip happens to be returning two pairs instead of 3. Could you advice on it.

                      – Dugini Vijay
                      Aug 14 '18 at 17:14











                    • Zip pairs together the first element of an obj with the 1st element of another object, 2nd with 2nd, etc until one of the objects runs out of elements. In your case, after 2 values. Said another way, it will pair up elements until there are no more items to pair. To give any suggestions, I'd need to know how you want your program to deal with the un-paired element (eg do you want a null from the 2nd set?). Also, there is only 1 df in this example. If your question is that different from this one, it's probably better to just ask another question

                      – David
                      Aug 14 '18 at 19:15













                    • Thanks @David for your reply. I figured it out. Using Izip helped over to solve this issue. But still I appreciate your response mate.

                      – Dugini Vijay
                      Aug 15 '18 at 18:17














                    8












                    8








                    8







                    You'd need to use flatMap, not map as you want to make multiple output rows out of each input row.



                    from pyspark.sql import Row
                    def dualExplode(r):
                    rowDict = r.asDict()
                    bList = rowDict.pop('b')
                    cList = rowDict.pop('c')
                    for b,c in zip(bList, cList):
                    newDict = dict(rowDict)
                    newDict['b'] = b
                    newDict['c'] = c
                    yield Row(**newDict)

                    df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))





                    share|improve this answer













                    You'd need to use flatMap, not map as you want to make multiple output rows out of each input row.



                    from pyspark.sql import Row
                    def dualExplode(r):
                    rowDict = r.asDict()
                    bList = rowDict.pop('b')
                    cList = rowDict.pop('c')
                    for b,c in zip(bList, cList):
                    newDict = dict(rowDict)
                    newDict['b'] = b
                    newDict['c'] = c
                    yield Row(**newDict)

                    df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))






                    share|improve this answer












                    share|improve this answer



                    share|improve this answer










                    answered Dec 7 '16 at 21:24









                    DavidDavid

                    5,79512435




                    5,79512435













                    • if the first df has 3 values and second df has 2 values, our zip happens to be returning two pairs instead of 3. Could you advice on it.

                      – Dugini Vijay
                      Aug 14 '18 at 17:14











                    • Zip pairs together the first element of an obj with the 1st element of another object, 2nd with 2nd, etc until one of the objects runs out of elements. In your case, after 2 values. Said another way, it will pair up elements until there are no more items to pair. To give any suggestions, I'd need to know how you want your program to deal with the un-paired element (eg do you want a null from the 2nd set?). Also, there is only 1 df in this example. If your question is that different from this one, it's probably better to just ask another question

                      – David
                      Aug 14 '18 at 19:15













                    • Thanks @David for your reply. I figured it out. Using Izip helped over to solve this issue. But still I appreciate your response mate.

                      – Dugini Vijay
                      Aug 15 '18 at 18:17



















                    • if the first df has 3 values and second df has 2 values, our zip happens to be returning two pairs instead of 3. Could you advice on it.

                      – Dugini Vijay
                      Aug 14 '18 at 17:14











                    • Zip pairs together the first element of an obj with the 1st element of another object, 2nd with 2nd, etc until one of the objects runs out of elements. In your case, after 2 values. Said another way, it will pair up elements until there are no more items to pair. To give any suggestions, I'd need to know how you want your program to deal with the un-paired element (eg do you want a null from the 2nd set?). Also, there is only 1 df in this example. If your question is that different from this one, it's probably better to just ask another question

                      – David
                      Aug 14 '18 at 19:15













                    • Thanks @David for your reply. I figured it out. Using Izip helped over to solve this issue. But still I appreciate your response mate.

                      – Dugini Vijay
                      Aug 15 '18 at 18:17

















                    if the first df has 3 values and second df has 2 values, our zip happens to be returning two pairs instead of 3. Could you advice on it.

                    – Dugini Vijay
                    Aug 14 '18 at 17:14





                    if the first df has 3 values and second df has 2 values, our zip happens to be returning two pairs instead of 3. Could you advice on it.

                    – Dugini Vijay
                    Aug 14 '18 at 17:14













                    Zip pairs together the first element of an obj with the 1st element of another object, 2nd with 2nd, etc until one of the objects runs out of elements. In your case, after 2 values. Said another way, it will pair up elements until there are no more items to pair. To give any suggestions, I'd need to know how you want your program to deal with the un-paired element (eg do you want a null from the 2nd set?). Also, there is only 1 df in this example. If your question is that different from this one, it's probably better to just ask another question

                    – David
                    Aug 14 '18 at 19:15







                    Zip pairs together the first element of an obj with the 1st element of another object, 2nd with 2nd, etc until one of the objects runs out of elements. In your case, after 2 values. Said another way, it will pair up elements until there are no more items to pair. To give any suggestions, I'd need to know how you want your program to deal with the un-paired element (eg do you want a null from the 2nd set?). Also, there is only 1 df in this example. If your question is that different from this one, it's probably better to just ask another question

                    – David
                    Aug 14 '18 at 19:15















                    Thanks @David for your reply. I figured it out. Using Izip helped over to solve this issue. But still I appreciate your response mate.

                    – Dugini Vijay
                    Aug 15 '18 at 18:17





                    Thanks @David for your reply. I figured it out. Using Izip helped over to solve this issue. But still I appreciate your response mate.

                    – Dugini Vijay
                    Aug 15 '18 at 18:17


















                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f41027315%2fpyspark-split-multiple-array-columns-into-rows%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Mossoró

                    Error while reading .h5 file using the rhdf5 package in R

                    Pushsharp Apns notification error: 'InvalidToken'