Read multiple directory separately with spark and write it back in a same structure

Multi tool use
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}
I have 3 years worth of data partitioned by year
/month
/day
/hour
I want to read all the files and do some processing and write them back to the new S3 buck with the same structure.
If data comes from s3://spark-old/production/events/2018/12/20/*
I want to persist it into s3://spark-new/production/events/2018/12/20/*
after computation is done.
These data have been written to the s3://spark-old/production/events/
with help of Kinesis
and FireHose
so technically FireHose decided that the data should be in what partition (Based on the time FireHose consumed them I think!).
Some requirements:
- I want the same record to be on the same partition in both new and old S3 Bucket
- I want This job to be as fast as possible (probably need to run it once a month)
- And doesn't consume all of my resources (I still have some other hourly and daily jobs that needed to be run)
I have tried a couple of ways but none of them are perfect.
1- Write a Spark job (hourly) that read hour by hour, process the data and write it back to the same path that it reads the data from. Then use Oozie
or any other scheduler to schedule an hourly job from the beginning of the data until now. Since each job is independent of the other jobs increase the concurrency on Oozie coordinator to something like 14 or 48 or ... and wait until all of them will be finished.
The problem is since each hour is not that big most of the spark job is wasted to create a spark session and kill it at the end. It takes a lot of time to process 3 years of data.
2- Instead of reading each hour try to read a day or month of data from the source 3://spark-old/production/events/
and try to save it back to the destination with the help of partitionBy("year", "month", "day", "hour")
. There are 2 issues with this approach
- When you use
Dataframe.partitionBy.save
it will write the data in a S3 path with a new format.s3://spark-new/production/events/year=2018/month=12/day=20/*
- Since we need to path partition ourself
partitionBy("year", "month", "day", "hour")
which we do base onserver_received_time
(the closest time as we have with when it get consumed by firehose) still there is no guarantee that the data you read from partition A to be stored in partition B. small percentage of data moved into a new partition.
3- Try to read a couple of hours (like a batch) in a single Spark job (Spark session) and write them back with parallelism in spark. I am still trying to figure out if this is doable and if it's a good approach or not.
I want to know what is the best way to do that with all the requirements satisfied.
scala apache-spark amazon-s3
add a comment |
I have 3 years worth of data partitioned by year
/month
/day
/hour
I want to read all the files and do some processing and write them back to the new S3 buck with the same structure.
If data comes from s3://spark-old/production/events/2018/12/20/*
I want to persist it into s3://spark-new/production/events/2018/12/20/*
after computation is done.
These data have been written to the s3://spark-old/production/events/
with help of Kinesis
and FireHose
so technically FireHose decided that the data should be in what partition (Based on the time FireHose consumed them I think!).
Some requirements:
- I want the same record to be on the same partition in both new and old S3 Bucket
- I want This job to be as fast as possible (probably need to run it once a month)
- And doesn't consume all of my resources (I still have some other hourly and daily jobs that needed to be run)
I have tried a couple of ways but none of them are perfect.
1- Write a Spark job (hourly) that read hour by hour, process the data and write it back to the same path that it reads the data from. Then use Oozie
or any other scheduler to schedule an hourly job from the beginning of the data until now. Since each job is independent of the other jobs increase the concurrency on Oozie coordinator to something like 14 or 48 or ... and wait until all of them will be finished.
The problem is since each hour is not that big most of the spark job is wasted to create a spark session and kill it at the end. It takes a lot of time to process 3 years of data.
2- Instead of reading each hour try to read a day or month of data from the source 3://spark-old/production/events/
and try to save it back to the destination with the help of partitionBy("year", "month", "day", "hour")
. There are 2 issues with this approach
- When you use
Dataframe.partitionBy.save
it will write the data in a S3 path with a new format.s3://spark-new/production/events/year=2018/month=12/day=20/*
- Since we need to path partition ourself
partitionBy("year", "month", "day", "hour")
which we do base onserver_received_time
(the closest time as we have with when it get consumed by firehose) still there is no guarantee that the data you read from partition A to be stored in partition B. small percentage of data moved into a new partition.
3- Try to read a couple of hours (like a batch) in a single Spark job (Spark session) and write them back with parallelism in spark. I am still trying to figure out if this is doable and if it's a good approach or not.
I want to know what is the best way to do that with all the requirements satisfied.
scala apache-spark amazon-s3
add a comment |
I have 3 years worth of data partitioned by year
/month
/day
/hour
I want to read all the files and do some processing and write them back to the new S3 buck with the same structure.
If data comes from s3://spark-old/production/events/2018/12/20/*
I want to persist it into s3://spark-new/production/events/2018/12/20/*
after computation is done.
These data have been written to the s3://spark-old/production/events/
with help of Kinesis
and FireHose
so technically FireHose decided that the data should be in what partition (Based on the time FireHose consumed them I think!).
Some requirements:
- I want the same record to be on the same partition in both new and old S3 Bucket
- I want This job to be as fast as possible (probably need to run it once a month)
- And doesn't consume all of my resources (I still have some other hourly and daily jobs that needed to be run)
I have tried a couple of ways but none of them are perfect.
1- Write a Spark job (hourly) that read hour by hour, process the data and write it back to the same path that it reads the data from. Then use Oozie
or any other scheduler to schedule an hourly job from the beginning of the data until now. Since each job is independent of the other jobs increase the concurrency on Oozie coordinator to something like 14 or 48 or ... and wait until all of them will be finished.
The problem is since each hour is not that big most of the spark job is wasted to create a spark session and kill it at the end. It takes a lot of time to process 3 years of data.
2- Instead of reading each hour try to read a day or month of data from the source 3://spark-old/production/events/
and try to save it back to the destination with the help of partitionBy("year", "month", "day", "hour")
. There are 2 issues with this approach
- When you use
Dataframe.partitionBy.save
it will write the data in a S3 path with a new format.s3://spark-new/production/events/year=2018/month=12/day=20/*
- Since we need to path partition ourself
partitionBy("year", "month", "day", "hour")
which we do base onserver_received_time
(the closest time as we have with when it get consumed by firehose) still there is no guarantee that the data you read from partition A to be stored in partition B. small percentage of data moved into a new partition.
3- Try to read a couple of hours (like a batch) in a single Spark job (Spark session) and write them back with parallelism in spark. I am still trying to figure out if this is doable and if it's a good approach or not.
I want to know what is the best way to do that with all the requirements satisfied.
scala apache-spark amazon-s3
I have 3 years worth of data partitioned by year
/month
/day
/hour
I want to read all the files and do some processing and write them back to the new S3 buck with the same structure.
If data comes from s3://spark-old/production/events/2018/12/20/*
I want to persist it into s3://spark-new/production/events/2018/12/20/*
after computation is done.
These data have been written to the s3://spark-old/production/events/
with help of Kinesis
and FireHose
so technically FireHose decided that the data should be in what partition (Based on the time FireHose consumed them I think!).
Some requirements:
- I want the same record to be on the same partition in both new and old S3 Bucket
- I want This job to be as fast as possible (probably need to run it once a month)
- And doesn't consume all of my resources (I still have some other hourly and daily jobs that needed to be run)
I have tried a couple of ways but none of them are perfect.
1- Write a Spark job (hourly) that read hour by hour, process the data and write it back to the same path that it reads the data from. Then use Oozie
or any other scheduler to schedule an hourly job from the beginning of the data until now. Since each job is independent of the other jobs increase the concurrency on Oozie coordinator to something like 14 or 48 or ... and wait until all of them will be finished.
The problem is since each hour is not that big most of the spark job is wasted to create a spark session and kill it at the end. It takes a lot of time to process 3 years of data.
2- Instead of reading each hour try to read a day or month of data from the source 3://spark-old/production/events/
and try to save it back to the destination with the help of partitionBy("year", "month", "day", "hour")
. There are 2 issues with this approach
- When you use
Dataframe.partitionBy.save
it will write the data in a S3 path with a new format.s3://spark-new/production/events/year=2018/month=12/day=20/*
- Since we need to path partition ourself
partitionBy("year", "month", "day", "hour")
which we do base onserver_received_time
(the closest time as we have with when it get consumed by firehose) still there is no guarantee that the data you read from partition A to be stored in partition B. small percentage of data moved into a new partition.
3- Try to read a couple of hours (like a batch) in a single Spark job (Spark session) and write them back with parallelism in spark. I am still trying to figure out if this is doable and if it's a good approach or not.
I want to know what is the best way to do that with all the requirements satisfied.
scala apache-spark amazon-s3
scala apache-spark amazon-s3
asked Jan 3 at 21:39
Am1rr3zAAm1rr3zA
2,317125090
2,317125090
add a comment |
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54030167%2fread-multiple-directory-separately-with-spark-and-write-it-back-in-a-same-struct%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54030167%2fread-multiple-directory-separately-with-spark-and-write-it-back-in-a-same-struct%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
2mf 9uTBF,Z,OwO 8r,JoFDuX1mwm nL M6nMS0lsD ZY,Uo,xsN IJOEVa2U,o,Iquzv