Spark Cluster configuration
I'm using a spark cluster with of two nodes each having two executors(each using 2 cores and 6GB memory).
Is this a good cluster configuration for a faster execution of my spark jobs?
I am kind of new to spark and I am running a job on 80 million rows of data which includes shuffling heavy tasks like aggregate(count) and join operations(self join on a dataframe).
Bottlenecks:
- Showing Insufficient resources for my executors while reading the data.
- On a smaller dataset, it's taking a lot of time.
What should be my approach and how can I do away with my bottlenecks?
Any suggestion would be highly appreciable.
query= "(Select x,y,z from table) as df"
jdbcDF = spark.read.format("jdbc").option("url", mysqlUrl)
.option("dbtable", query)
.option("user", mysqldetails[2])
.option("password", mysqldetails[3])
.option("numPartitions", "1000")
.load()
This gives me a dataframe which on jdbcDF.rdd.getNumPartitions() gives me value of 1. Am I missing something here?. I think I am not parallelizing my dataset.
apache-spark pyspark
add a comment |
I'm using a spark cluster with of two nodes each having two executors(each using 2 cores and 6GB memory).
Is this a good cluster configuration for a faster execution of my spark jobs?
I am kind of new to spark and I am running a job on 80 million rows of data which includes shuffling heavy tasks like aggregate(count) and join operations(self join on a dataframe).
Bottlenecks:
- Showing Insufficient resources for my executors while reading the data.
- On a smaller dataset, it's taking a lot of time.
What should be my approach and how can I do away with my bottlenecks?
Any suggestion would be highly appreciable.
query= "(Select x,y,z from table) as df"
jdbcDF = spark.read.format("jdbc").option("url", mysqlUrl)
.option("dbtable", query)
.option("user", mysqldetails[2])
.option("password", mysqldetails[3])
.option("numPartitions", "1000")
.load()
This gives me a dataframe which on jdbcDF.rdd.getNumPartitions() gives me value of 1. Am I missing something here?. I think I am not parallelizing my dataset.
apache-spark pyspark
add a comment |
I'm using a spark cluster with of two nodes each having two executors(each using 2 cores and 6GB memory).
Is this a good cluster configuration for a faster execution of my spark jobs?
I am kind of new to spark and I am running a job on 80 million rows of data which includes shuffling heavy tasks like aggregate(count) and join operations(self join on a dataframe).
Bottlenecks:
- Showing Insufficient resources for my executors while reading the data.
- On a smaller dataset, it's taking a lot of time.
What should be my approach and how can I do away with my bottlenecks?
Any suggestion would be highly appreciable.
query= "(Select x,y,z from table) as df"
jdbcDF = spark.read.format("jdbc").option("url", mysqlUrl)
.option("dbtable", query)
.option("user", mysqldetails[2])
.option("password", mysqldetails[3])
.option("numPartitions", "1000")
.load()
This gives me a dataframe which on jdbcDF.rdd.getNumPartitions() gives me value of 1. Am I missing something here?. I think I am not parallelizing my dataset.
apache-spark pyspark
I'm using a spark cluster with of two nodes each having two executors(each using 2 cores and 6GB memory).
Is this a good cluster configuration for a faster execution of my spark jobs?
I am kind of new to spark and I am running a job on 80 million rows of data which includes shuffling heavy tasks like aggregate(count) and join operations(self join on a dataframe).
Bottlenecks:
- Showing Insufficient resources for my executors while reading the data.
- On a smaller dataset, it's taking a lot of time.
What should be my approach and how can I do away with my bottlenecks?
Any suggestion would be highly appreciable.
query= "(Select x,y,z from table) as df"
jdbcDF = spark.read.format("jdbc").option("url", mysqlUrl)
.option("dbtable", query)
.option("user", mysqldetails[2])
.option("password", mysqldetails[3])
.option("numPartitions", "1000")
.load()
This gives me a dataframe which on jdbcDF.rdd.getNumPartitions() gives me value of 1. Am I missing something here?. I think I am not parallelizing my dataset.
apache-spark pyspark
apache-spark pyspark
edited Jan 3 at 9:37
Vivek
asked Jan 2 at 9:19
VivekVivek
112
112
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
There are different ways to improve the performance of your application. PFB some of the points which may help.
Try to reduce the number of records and columns for processing. As you have mentioned you are new to spark and you might not need all 80 million rows, so you can filter the rows to whatever you require. Also, select the columns which is required but not all.
If you are using some data frequently then try considering caching the data, so that for the next operation it will be read from the memory.
If you are joining two DataFrames and if one of them is small enough to fit in memory then you can consider broadcast join.
Increasing the resources might not improve the performance of your application in all cases, but looking at your configuration of the cluster, it should help. It might be good idea to throw some more resources and check the performance.
You can also try using Spark UI to monitor your application and see if there are few task which are taking long time than others. Then probably you need to deal with skewness of your data.
You can try considering to Partition your data based on the columns which you are using in your filter criteria.
Thank you for your recommendations. I have already tried few of the points mentioned - I am only reading the required data and using the broadcast hash join. However I do not know much about accessing the spark UI to monitor my tasks and I figured that spark jdbc reader is not parallelizing my dataset when reading. It gives me data on a single partition regardless of the numPartitions mentioned in the configuration.
– Vivek
Jan 3 at 9:20
I have added a snippet of my code where seems to be an issue. Could you help me with this?
– Vivek
Jan 3 at 9:38
Once you created jdbcDF try repartitioning the DF based on some column and then check the partitions on new DF. newdf = jdbcDF.repartition(1000, "col1") newdf.rdd.getNumPartitions()
– neeraj bhadani
Jan 3 at 9:44
I am avoiding repartitioning because it involves reshuffling. Also it is not increasing my computational efficiency. I am confused why there is no partitioning while reading the data from db.
– Vivek
Jan 3 at 11:00
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54003817%2fspark-cluster-configuration%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
There are different ways to improve the performance of your application. PFB some of the points which may help.
Try to reduce the number of records and columns for processing. As you have mentioned you are new to spark and you might not need all 80 million rows, so you can filter the rows to whatever you require. Also, select the columns which is required but not all.
If you are using some data frequently then try considering caching the data, so that for the next operation it will be read from the memory.
If you are joining two DataFrames and if one of them is small enough to fit in memory then you can consider broadcast join.
Increasing the resources might not improve the performance of your application in all cases, but looking at your configuration of the cluster, it should help. It might be good idea to throw some more resources and check the performance.
You can also try using Spark UI to monitor your application and see if there are few task which are taking long time than others. Then probably you need to deal with skewness of your data.
You can try considering to Partition your data based on the columns which you are using in your filter criteria.
Thank you for your recommendations. I have already tried few of the points mentioned - I am only reading the required data and using the broadcast hash join. However I do not know much about accessing the spark UI to monitor my tasks and I figured that spark jdbc reader is not parallelizing my dataset when reading. It gives me data on a single partition regardless of the numPartitions mentioned in the configuration.
– Vivek
Jan 3 at 9:20
I have added a snippet of my code where seems to be an issue. Could you help me with this?
– Vivek
Jan 3 at 9:38
Once you created jdbcDF try repartitioning the DF based on some column and then check the partitions on new DF. newdf = jdbcDF.repartition(1000, "col1") newdf.rdd.getNumPartitions()
– neeraj bhadani
Jan 3 at 9:44
I am avoiding repartitioning because it involves reshuffling. Also it is not increasing my computational efficiency. I am confused why there is no partitioning while reading the data from db.
– Vivek
Jan 3 at 11:00
add a comment |
There are different ways to improve the performance of your application. PFB some of the points which may help.
Try to reduce the number of records and columns for processing. As you have mentioned you are new to spark and you might not need all 80 million rows, so you can filter the rows to whatever you require. Also, select the columns which is required but not all.
If you are using some data frequently then try considering caching the data, so that for the next operation it will be read from the memory.
If you are joining two DataFrames and if one of them is small enough to fit in memory then you can consider broadcast join.
Increasing the resources might not improve the performance of your application in all cases, but looking at your configuration of the cluster, it should help. It might be good idea to throw some more resources and check the performance.
You can also try using Spark UI to monitor your application and see if there are few task which are taking long time than others. Then probably you need to deal with skewness of your data.
You can try considering to Partition your data based on the columns which you are using in your filter criteria.
Thank you for your recommendations. I have already tried few of the points mentioned - I am only reading the required data and using the broadcast hash join. However I do not know much about accessing the spark UI to monitor my tasks and I figured that spark jdbc reader is not parallelizing my dataset when reading. It gives me data on a single partition regardless of the numPartitions mentioned in the configuration.
– Vivek
Jan 3 at 9:20
I have added a snippet of my code where seems to be an issue. Could you help me with this?
– Vivek
Jan 3 at 9:38
Once you created jdbcDF try repartitioning the DF based on some column and then check the partitions on new DF. newdf = jdbcDF.repartition(1000, "col1") newdf.rdd.getNumPartitions()
– neeraj bhadani
Jan 3 at 9:44
I am avoiding repartitioning because it involves reshuffling. Also it is not increasing my computational efficiency. I am confused why there is no partitioning while reading the data from db.
– Vivek
Jan 3 at 11:00
add a comment |
There are different ways to improve the performance of your application. PFB some of the points which may help.
Try to reduce the number of records and columns for processing. As you have mentioned you are new to spark and you might not need all 80 million rows, so you can filter the rows to whatever you require. Also, select the columns which is required but not all.
If you are using some data frequently then try considering caching the data, so that for the next operation it will be read from the memory.
If you are joining two DataFrames and if one of them is small enough to fit in memory then you can consider broadcast join.
Increasing the resources might not improve the performance of your application in all cases, but looking at your configuration of the cluster, it should help. It might be good idea to throw some more resources and check the performance.
You can also try using Spark UI to monitor your application and see if there are few task which are taking long time than others. Then probably you need to deal with skewness of your data.
You can try considering to Partition your data based on the columns which you are using in your filter criteria.
There are different ways to improve the performance of your application. PFB some of the points which may help.
Try to reduce the number of records and columns for processing. As you have mentioned you are new to spark and you might not need all 80 million rows, so you can filter the rows to whatever you require. Also, select the columns which is required but not all.
If you are using some data frequently then try considering caching the data, so that for the next operation it will be read from the memory.
If you are joining two DataFrames and if one of them is small enough to fit in memory then you can consider broadcast join.
Increasing the resources might not improve the performance of your application in all cases, but looking at your configuration of the cluster, it should help. It might be good idea to throw some more resources and check the performance.
You can also try using Spark UI to monitor your application and see if there are few task which are taking long time than others. Then probably you need to deal with skewness of your data.
You can try considering to Partition your data based on the columns which you are using in your filter criteria.
edited Jan 2 at 16:56
pault
16k32552
16k32552
answered Jan 2 at 10:55
neeraj bhadanineeraj bhadani
915312
915312
Thank you for your recommendations. I have already tried few of the points mentioned - I am only reading the required data and using the broadcast hash join. However I do not know much about accessing the spark UI to monitor my tasks and I figured that spark jdbc reader is not parallelizing my dataset when reading. It gives me data on a single partition regardless of the numPartitions mentioned in the configuration.
– Vivek
Jan 3 at 9:20
I have added a snippet of my code where seems to be an issue. Could you help me with this?
– Vivek
Jan 3 at 9:38
Once you created jdbcDF try repartitioning the DF based on some column and then check the partitions on new DF. newdf = jdbcDF.repartition(1000, "col1") newdf.rdd.getNumPartitions()
– neeraj bhadani
Jan 3 at 9:44
I am avoiding repartitioning because it involves reshuffling. Also it is not increasing my computational efficiency. I am confused why there is no partitioning while reading the data from db.
– Vivek
Jan 3 at 11:00
add a comment |
Thank you for your recommendations. I have already tried few of the points mentioned - I am only reading the required data and using the broadcast hash join. However I do not know much about accessing the spark UI to monitor my tasks and I figured that spark jdbc reader is not parallelizing my dataset when reading. It gives me data on a single partition regardless of the numPartitions mentioned in the configuration.
– Vivek
Jan 3 at 9:20
I have added a snippet of my code where seems to be an issue. Could you help me with this?
– Vivek
Jan 3 at 9:38
Once you created jdbcDF try repartitioning the DF based on some column and then check the partitions on new DF. newdf = jdbcDF.repartition(1000, "col1") newdf.rdd.getNumPartitions()
– neeraj bhadani
Jan 3 at 9:44
I am avoiding repartitioning because it involves reshuffling. Also it is not increasing my computational efficiency. I am confused why there is no partitioning while reading the data from db.
– Vivek
Jan 3 at 11:00
Thank you for your recommendations. I have already tried few of the points mentioned - I am only reading the required data and using the broadcast hash join. However I do not know much about accessing the spark UI to monitor my tasks and I figured that spark jdbc reader is not parallelizing my dataset when reading. It gives me data on a single partition regardless of the numPartitions mentioned in the configuration.
– Vivek
Jan 3 at 9:20
Thank you for your recommendations. I have already tried few of the points mentioned - I am only reading the required data and using the broadcast hash join. However I do not know much about accessing the spark UI to monitor my tasks and I figured that spark jdbc reader is not parallelizing my dataset when reading. It gives me data on a single partition regardless of the numPartitions mentioned in the configuration.
– Vivek
Jan 3 at 9:20
I have added a snippet of my code where seems to be an issue. Could you help me with this?
– Vivek
Jan 3 at 9:38
I have added a snippet of my code where seems to be an issue. Could you help me with this?
– Vivek
Jan 3 at 9:38
Once you created jdbcDF try repartitioning the DF based on some column and then check the partitions on new DF. newdf = jdbcDF.repartition(1000, "col1") newdf.rdd.getNumPartitions()
– neeraj bhadani
Jan 3 at 9:44
Once you created jdbcDF try repartitioning the DF based on some column and then check the partitions on new DF. newdf = jdbcDF.repartition(1000, "col1") newdf.rdd.getNumPartitions()
– neeraj bhadani
Jan 3 at 9:44
I am avoiding repartitioning because it involves reshuffling. Also it is not increasing my computational efficiency. I am confused why there is no partitioning while reading the data from db.
– Vivek
Jan 3 at 11:00
I am avoiding repartitioning because it involves reshuffling. Also it is not increasing my computational efficiency. I am confused why there is no partitioning while reading the data from db.
– Vivek
Jan 3 at 11:00
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54003817%2fspark-cluster-configuration%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown