Spark Cluster configuration












0















I'm using a spark cluster with of two nodes each having two executors(each using 2 cores and 6GB memory).



Is this a good cluster configuration for a faster execution of my spark jobs?



I am kind of new to spark and I am running a job on 80 million rows of data which includes shuffling heavy tasks like aggregate(count) and join operations(self join on a dataframe).



Bottlenecks:




  1. Showing Insufficient resources for my executors while reading the data.

  2. On a smaller dataset, it's taking a lot of time.


What should be my approach and how can I do away with my bottlenecks?

Any suggestion would be highly appreciable.



query= "(Select x,y,z from table) as df"



    jdbcDF = spark.read.format("jdbc").option("url", mysqlUrl) 
.option("dbtable", query)
.option("user", mysqldetails[2])
.option("password", mysqldetails[3])
.option("numPartitions", "1000")
.load()


This gives me a dataframe which on jdbcDF.rdd.getNumPartitions() gives me value of 1. Am I missing something here?. I think I am not parallelizing my dataset.










share|improve this question





























    0















    I'm using a spark cluster with of two nodes each having two executors(each using 2 cores and 6GB memory).



    Is this a good cluster configuration for a faster execution of my spark jobs?



    I am kind of new to spark and I am running a job on 80 million rows of data which includes shuffling heavy tasks like aggregate(count) and join operations(self join on a dataframe).



    Bottlenecks:




    1. Showing Insufficient resources for my executors while reading the data.

    2. On a smaller dataset, it's taking a lot of time.


    What should be my approach and how can I do away with my bottlenecks?

    Any suggestion would be highly appreciable.



    query= "(Select x,y,z from table) as df"



        jdbcDF = spark.read.format("jdbc").option("url", mysqlUrl) 
    .option("dbtable", query)
    .option("user", mysqldetails[2])
    .option("password", mysqldetails[3])
    .option("numPartitions", "1000")
    .load()


    This gives me a dataframe which on jdbcDF.rdd.getNumPartitions() gives me value of 1. Am I missing something here?. I think I am not parallelizing my dataset.










    share|improve this question



























      0












      0








      0








      I'm using a spark cluster with of two nodes each having two executors(each using 2 cores and 6GB memory).



      Is this a good cluster configuration for a faster execution of my spark jobs?



      I am kind of new to spark and I am running a job on 80 million rows of data which includes shuffling heavy tasks like aggregate(count) and join operations(self join on a dataframe).



      Bottlenecks:




      1. Showing Insufficient resources for my executors while reading the data.

      2. On a smaller dataset, it's taking a lot of time.


      What should be my approach and how can I do away with my bottlenecks?

      Any suggestion would be highly appreciable.



      query= "(Select x,y,z from table) as df"



          jdbcDF = spark.read.format("jdbc").option("url", mysqlUrl) 
      .option("dbtable", query)
      .option("user", mysqldetails[2])
      .option("password", mysqldetails[3])
      .option("numPartitions", "1000")
      .load()


      This gives me a dataframe which on jdbcDF.rdd.getNumPartitions() gives me value of 1. Am I missing something here?. I think I am not parallelizing my dataset.










      share|improve this question
















      I'm using a spark cluster with of two nodes each having two executors(each using 2 cores and 6GB memory).



      Is this a good cluster configuration for a faster execution of my spark jobs?



      I am kind of new to spark and I am running a job on 80 million rows of data which includes shuffling heavy tasks like aggregate(count) and join operations(self join on a dataframe).



      Bottlenecks:




      1. Showing Insufficient resources for my executors while reading the data.

      2. On a smaller dataset, it's taking a lot of time.


      What should be my approach and how can I do away with my bottlenecks?

      Any suggestion would be highly appreciable.



      query= "(Select x,y,z from table) as df"



          jdbcDF = spark.read.format("jdbc").option("url", mysqlUrl) 
      .option("dbtable", query)
      .option("user", mysqldetails[2])
      .option("password", mysqldetails[3])
      .option("numPartitions", "1000")
      .load()


      This gives me a dataframe which on jdbcDF.rdd.getNumPartitions() gives me value of 1. Am I missing something here?. I think I am not parallelizing my dataset.







      apache-spark pyspark






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Jan 3 at 9:37







      Vivek

















      asked Jan 2 at 9:19









      VivekVivek

      112




      112
























          1 Answer
          1






          active

          oldest

          votes


















          1














          There are different ways to improve the performance of your application. PFB some of the points which may help.




          1. Try to reduce the number of records and columns for processing. As you have mentioned you are new to spark and you might not need all 80 million rows, so you can filter the rows to whatever you require. Also, select the columns which is required but not all.


          2. If you are using some data frequently then try considering caching the data, so that for the next operation it will be read from the memory.


          3. If you are joining two DataFrames and if one of them is small enough to fit in memory then you can consider broadcast join.


          4. Increasing the resources might not improve the performance of your application in all cases, but looking at your configuration of the cluster, it should help. It might be good idea to throw some more resources and check the performance.


          5. You can also try using Spark UI to monitor your application and see if there are few task which are taking long time than others. Then probably you need to deal with skewness of your data.


          6. You can try considering to Partition your data based on the columns which you are using in your filter criteria.







          share|improve this answer


























          • Thank you for your recommendations. I have already tried few of the points mentioned - I am only reading the required data and using the broadcast hash join. However I do not know much about accessing the spark UI to monitor my tasks and I figured that spark jdbc reader is not parallelizing my dataset when reading. It gives me data on a single partition regardless of the numPartitions mentioned in the configuration.

            – Vivek
            Jan 3 at 9:20













          • I have added a snippet of my code where seems to be an issue. Could you help me with this?

            – Vivek
            Jan 3 at 9:38











          • Once you created jdbcDF try repartitioning the DF based on some column and then check the partitions on new DF. newdf = jdbcDF.repartition(1000, "col1") newdf.rdd.getNumPartitions()

            – neeraj bhadani
            Jan 3 at 9:44













          • I am avoiding repartitioning because it involves reshuffling. Also it is not increasing my computational efficiency. I am confused why there is no partitioning while reading the data from db.

            – Vivek
            Jan 3 at 11:00











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54003817%2fspark-cluster-configuration%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          1














          There are different ways to improve the performance of your application. PFB some of the points which may help.




          1. Try to reduce the number of records and columns for processing. As you have mentioned you are new to spark and you might not need all 80 million rows, so you can filter the rows to whatever you require. Also, select the columns which is required but not all.


          2. If you are using some data frequently then try considering caching the data, so that for the next operation it will be read from the memory.


          3. If you are joining two DataFrames and if one of them is small enough to fit in memory then you can consider broadcast join.


          4. Increasing the resources might not improve the performance of your application in all cases, but looking at your configuration of the cluster, it should help. It might be good idea to throw some more resources and check the performance.


          5. You can also try using Spark UI to monitor your application and see if there are few task which are taking long time than others. Then probably you need to deal with skewness of your data.


          6. You can try considering to Partition your data based on the columns which you are using in your filter criteria.







          share|improve this answer


























          • Thank you for your recommendations. I have already tried few of the points mentioned - I am only reading the required data and using the broadcast hash join. However I do not know much about accessing the spark UI to monitor my tasks and I figured that spark jdbc reader is not parallelizing my dataset when reading. It gives me data on a single partition regardless of the numPartitions mentioned in the configuration.

            – Vivek
            Jan 3 at 9:20













          • I have added a snippet of my code where seems to be an issue. Could you help me with this?

            – Vivek
            Jan 3 at 9:38











          • Once you created jdbcDF try repartitioning the DF based on some column and then check the partitions on new DF. newdf = jdbcDF.repartition(1000, "col1") newdf.rdd.getNumPartitions()

            – neeraj bhadani
            Jan 3 at 9:44













          • I am avoiding repartitioning because it involves reshuffling. Also it is not increasing my computational efficiency. I am confused why there is no partitioning while reading the data from db.

            – Vivek
            Jan 3 at 11:00
















          1














          There are different ways to improve the performance of your application. PFB some of the points which may help.




          1. Try to reduce the number of records and columns for processing. As you have mentioned you are new to spark and you might not need all 80 million rows, so you can filter the rows to whatever you require. Also, select the columns which is required but not all.


          2. If you are using some data frequently then try considering caching the data, so that for the next operation it will be read from the memory.


          3. If you are joining two DataFrames and if one of them is small enough to fit in memory then you can consider broadcast join.


          4. Increasing the resources might not improve the performance of your application in all cases, but looking at your configuration of the cluster, it should help. It might be good idea to throw some more resources and check the performance.


          5. You can also try using Spark UI to monitor your application and see if there are few task which are taking long time than others. Then probably you need to deal with skewness of your data.


          6. You can try considering to Partition your data based on the columns which you are using in your filter criteria.







          share|improve this answer


























          • Thank you for your recommendations. I have already tried few of the points mentioned - I am only reading the required data and using the broadcast hash join. However I do not know much about accessing the spark UI to monitor my tasks and I figured that spark jdbc reader is not parallelizing my dataset when reading. It gives me data on a single partition regardless of the numPartitions mentioned in the configuration.

            – Vivek
            Jan 3 at 9:20













          • I have added a snippet of my code where seems to be an issue. Could you help me with this?

            – Vivek
            Jan 3 at 9:38











          • Once you created jdbcDF try repartitioning the DF based on some column and then check the partitions on new DF. newdf = jdbcDF.repartition(1000, "col1") newdf.rdd.getNumPartitions()

            – neeraj bhadani
            Jan 3 at 9:44













          • I am avoiding repartitioning because it involves reshuffling. Also it is not increasing my computational efficiency. I am confused why there is no partitioning while reading the data from db.

            – Vivek
            Jan 3 at 11:00














          1












          1








          1







          There are different ways to improve the performance of your application. PFB some of the points which may help.




          1. Try to reduce the number of records and columns for processing. As you have mentioned you are new to spark and you might not need all 80 million rows, so you can filter the rows to whatever you require. Also, select the columns which is required but not all.


          2. If you are using some data frequently then try considering caching the data, so that for the next operation it will be read from the memory.


          3. If you are joining two DataFrames and if one of them is small enough to fit in memory then you can consider broadcast join.


          4. Increasing the resources might not improve the performance of your application in all cases, but looking at your configuration of the cluster, it should help. It might be good idea to throw some more resources and check the performance.


          5. You can also try using Spark UI to monitor your application and see if there are few task which are taking long time than others. Then probably you need to deal with skewness of your data.


          6. You can try considering to Partition your data based on the columns which you are using in your filter criteria.







          share|improve this answer















          There are different ways to improve the performance of your application. PFB some of the points which may help.




          1. Try to reduce the number of records and columns for processing. As you have mentioned you are new to spark and you might not need all 80 million rows, so you can filter the rows to whatever you require. Also, select the columns which is required but not all.


          2. If you are using some data frequently then try considering caching the data, so that for the next operation it will be read from the memory.


          3. If you are joining two DataFrames and if one of them is small enough to fit in memory then you can consider broadcast join.


          4. Increasing the resources might not improve the performance of your application in all cases, but looking at your configuration of the cluster, it should help. It might be good idea to throw some more resources and check the performance.


          5. You can also try using Spark UI to monitor your application and see if there are few task which are taking long time than others. Then probably you need to deal with skewness of your data.


          6. You can try considering to Partition your data based on the columns which you are using in your filter criteria.








          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Jan 2 at 16:56









          pault

          16k32552




          16k32552










          answered Jan 2 at 10:55









          neeraj bhadanineeraj bhadani

          915312




          915312













          • Thank you for your recommendations. I have already tried few of the points mentioned - I am only reading the required data and using the broadcast hash join. However I do not know much about accessing the spark UI to monitor my tasks and I figured that spark jdbc reader is not parallelizing my dataset when reading. It gives me data on a single partition regardless of the numPartitions mentioned in the configuration.

            – Vivek
            Jan 3 at 9:20













          • I have added a snippet of my code where seems to be an issue. Could you help me with this?

            – Vivek
            Jan 3 at 9:38











          • Once you created jdbcDF try repartitioning the DF based on some column and then check the partitions on new DF. newdf = jdbcDF.repartition(1000, "col1") newdf.rdd.getNumPartitions()

            – neeraj bhadani
            Jan 3 at 9:44













          • I am avoiding repartitioning because it involves reshuffling. Also it is not increasing my computational efficiency. I am confused why there is no partitioning while reading the data from db.

            – Vivek
            Jan 3 at 11:00



















          • Thank you for your recommendations. I have already tried few of the points mentioned - I am only reading the required data and using the broadcast hash join. However I do not know much about accessing the spark UI to monitor my tasks and I figured that spark jdbc reader is not parallelizing my dataset when reading. It gives me data on a single partition regardless of the numPartitions mentioned in the configuration.

            – Vivek
            Jan 3 at 9:20













          • I have added a snippet of my code where seems to be an issue. Could you help me with this?

            – Vivek
            Jan 3 at 9:38











          • Once you created jdbcDF try repartitioning the DF based on some column and then check the partitions on new DF. newdf = jdbcDF.repartition(1000, "col1") newdf.rdd.getNumPartitions()

            – neeraj bhadani
            Jan 3 at 9:44













          • I am avoiding repartitioning because it involves reshuffling. Also it is not increasing my computational efficiency. I am confused why there is no partitioning while reading the data from db.

            – Vivek
            Jan 3 at 11:00

















          Thank you for your recommendations. I have already tried few of the points mentioned - I am only reading the required data and using the broadcast hash join. However I do not know much about accessing the spark UI to monitor my tasks and I figured that spark jdbc reader is not parallelizing my dataset when reading. It gives me data on a single partition regardless of the numPartitions mentioned in the configuration.

          – Vivek
          Jan 3 at 9:20







          Thank you for your recommendations. I have already tried few of the points mentioned - I am only reading the required data and using the broadcast hash join. However I do not know much about accessing the spark UI to monitor my tasks and I figured that spark jdbc reader is not parallelizing my dataset when reading. It gives me data on a single partition regardless of the numPartitions mentioned in the configuration.

          – Vivek
          Jan 3 at 9:20















          I have added a snippet of my code where seems to be an issue. Could you help me with this?

          – Vivek
          Jan 3 at 9:38





          I have added a snippet of my code where seems to be an issue. Could you help me with this?

          – Vivek
          Jan 3 at 9:38













          Once you created jdbcDF try repartitioning the DF based on some column and then check the partitions on new DF. newdf = jdbcDF.repartition(1000, "col1") newdf.rdd.getNumPartitions()

          – neeraj bhadani
          Jan 3 at 9:44







          Once you created jdbcDF try repartitioning the DF based on some column and then check the partitions on new DF. newdf = jdbcDF.repartition(1000, "col1") newdf.rdd.getNumPartitions()

          – neeraj bhadani
          Jan 3 at 9:44















          I am avoiding repartitioning because it involves reshuffling. Also it is not increasing my computational efficiency. I am confused why there is no partitioning while reading the data from db.

          – Vivek
          Jan 3 at 11:00





          I am avoiding repartitioning because it involves reshuffling. Also it is not increasing my computational efficiency. I am confused why there is no partitioning while reading the data from db.

          – Vivek
          Jan 3 at 11:00




















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54003817%2fspark-cluster-configuration%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Monofisismo

          Angular Downloading a file using contenturl with Basic Authentication

          Olmecas