Read multiple directory separately with spark and write it back in a same structure

Multi tool use
Multi tool use





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}







0















I have 3 years worth of data partitioned by year/month/day/hour I want to read all the files and do some processing and write them back to the new S3 buck with the same structure.



If data comes from s3://spark-old/production/events/2018/12/20/* I want to persist it into s3://spark-new/production/events/2018/12/20/* after computation is done.



These data have been written to the s3://spark-old/production/events/ with help of Kinesis and FireHose so technically FireHose decided that the data should be in what partition (Based on the time FireHose consumed them I think!).



Some requirements:




  • I want the same record to be on the same partition in both new and old S3 Bucket

  • I want This job to be as fast as possible (probably need to run it once a month)

  • And doesn't consume all of my resources (I still have some other hourly and daily jobs that needed to be run)


I have tried a couple of ways but none of them are perfect.



1- Write a Spark job (hourly) that read hour by hour, process the data and write it back to the same path that it reads the data from. Then use Oozie or any other scheduler to schedule an hourly job from the beginning of the data until now. Since each job is independent of the other jobs increase the concurrency on Oozie coordinator to something like 14 or 48 or ... and wait until all of them will be finished.
The problem is since each hour is not that big most of the spark job is wasted to create a spark session and kill it at the end. It takes a lot of time to process 3 years of data.



2- Instead of reading each hour try to read a day or month of data from the source 3://spark-old/production/events/ and try to save it back to the destination with the help of partitionBy("year", "month", "day", "hour"). There are 2 issues with this approach




  1. When you use Dataframe.partitionBy.save it will write the data in a S3 path with a new format. s3://spark-new/production/events/year=2018/month=12/day=20/*

  2. Since we need to path partition ourself partitionBy("year", "month", "day", "hour") which we do base on server_received_time (the closest time as we have with when it get consumed by firehose) still there is no guarantee that the data you read from partition A to be stored in partition B. small percentage of data moved into a new partition.


3- Try to read a couple of hours (like a batch) in a single Spark job (Spark session) and write them back with parallelism in spark. I am still trying to figure out if this is doable and if it's a good approach or not.



I want to know what is the best way to do that with all the requirements satisfied.










share|improve this question





























    0















    I have 3 years worth of data partitioned by year/month/day/hour I want to read all the files and do some processing and write them back to the new S3 buck with the same structure.



    If data comes from s3://spark-old/production/events/2018/12/20/* I want to persist it into s3://spark-new/production/events/2018/12/20/* after computation is done.



    These data have been written to the s3://spark-old/production/events/ with help of Kinesis and FireHose so technically FireHose decided that the data should be in what partition (Based on the time FireHose consumed them I think!).



    Some requirements:




    • I want the same record to be on the same partition in both new and old S3 Bucket

    • I want This job to be as fast as possible (probably need to run it once a month)

    • And doesn't consume all of my resources (I still have some other hourly and daily jobs that needed to be run)


    I have tried a couple of ways but none of them are perfect.



    1- Write a Spark job (hourly) that read hour by hour, process the data and write it back to the same path that it reads the data from. Then use Oozie or any other scheduler to schedule an hourly job from the beginning of the data until now. Since each job is independent of the other jobs increase the concurrency on Oozie coordinator to something like 14 or 48 or ... and wait until all of them will be finished.
    The problem is since each hour is not that big most of the spark job is wasted to create a spark session and kill it at the end. It takes a lot of time to process 3 years of data.



    2- Instead of reading each hour try to read a day or month of data from the source 3://spark-old/production/events/ and try to save it back to the destination with the help of partitionBy("year", "month", "day", "hour"). There are 2 issues with this approach




    1. When you use Dataframe.partitionBy.save it will write the data in a S3 path with a new format. s3://spark-new/production/events/year=2018/month=12/day=20/*

    2. Since we need to path partition ourself partitionBy("year", "month", "day", "hour") which we do base on server_received_time (the closest time as we have with when it get consumed by firehose) still there is no guarantee that the data you read from partition A to be stored in partition B. small percentage of data moved into a new partition.


    3- Try to read a couple of hours (like a batch) in a single Spark job (Spark session) and write them back with parallelism in spark. I am still trying to figure out if this is doable and if it's a good approach or not.



    I want to know what is the best way to do that with all the requirements satisfied.










    share|improve this question

























      0












      0








      0








      I have 3 years worth of data partitioned by year/month/day/hour I want to read all the files and do some processing and write them back to the new S3 buck with the same structure.



      If data comes from s3://spark-old/production/events/2018/12/20/* I want to persist it into s3://spark-new/production/events/2018/12/20/* after computation is done.



      These data have been written to the s3://spark-old/production/events/ with help of Kinesis and FireHose so technically FireHose decided that the data should be in what partition (Based on the time FireHose consumed them I think!).



      Some requirements:




      • I want the same record to be on the same partition in both new and old S3 Bucket

      • I want This job to be as fast as possible (probably need to run it once a month)

      • And doesn't consume all of my resources (I still have some other hourly and daily jobs that needed to be run)


      I have tried a couple of ways but none of them are perfect.



      1- Write a Spark job (hourly) that read hour by hour, process the data and write it back to the same path that it reads the data from. Then use Oozie or any other scheduler to schedule an hourly job from the beginning of the data until now. Since each job is independent of the other jobs increase the concurrency on Oozie coordinator to something like 14 or 48 or ... and wait until all of them will be finished.
      The problem is since each hour is not that big most of the spark job is wasted to create a spark session and kill it at the end. It takes a lot of time to process 3 years of data.



      2- Instead of reading each hour try to read a day or month of data from the source 3://spark-old/production/events/ and try to save it back to the destination with the help of partitionBy("year", "month", "day", "hour"). There are 2 issues with this approach




      1. When you use Dataframe.partitionBy.save it will write the data in a S3 path with a new format. s3://spark-new/production/events/year=2018/month=12/day=20/*

      2. Since we need to path partition ourself partitionBy("year", "month", "day", "hour") which we do base on server_received_time (the closest time as we have with when it get consumed by firehose) still there is no guarantee that the data you read from partition A to be stored in partition B. small percentage of data moved into a new partition.


      3- Try to read a couple of hours (like a batch) in a single Spark job (Spark session) and write them back with parallelism in spark. I am still trying to figure out if this is doable and if it's a good approach or not.



      I want to know what is the best way to do that with all the requirements satisfied.










      share|improve this question














      I have 3 years worth of data partitioned by year/month/day/hour I want to read all the files and do some processing and write them back to the new S3 buck with the same structure.



      If data comes from s3://spark-old/production/events/2018/12/20/* I want to persist it into s3://spark-new/production/events/2018/12/20/* after computation is done.



      These data have been written to the s3://spark-old/production/events/ with help of Kinesis and FireHose so technically FireHose decided that the data should be in what partition (Based on the time FireHose consumed them I think!).



      Some requirements:




      • I want the same record to be on the same partition in both new and old S3 Bucket

      • I want This job to be as fast as possible (probably need to run it once a month)

      • And doesn't consume all of my resources (I still have some other hourly and daily jobs that needed to be run)


      I have tried a couple of ways but none of them are perfect.



      1- Write a Spark job (hourly) that read hour by hour, process the data and write it back to the same path that it reads the data from. Then use Oozie or any other scheduler to schedule an hourly job from the beginning of the data until now. Since each job is independent of the other jobs increase the concurrency on Oozie coordinator to something like 14 or 48 or ... and wait until all of them will be finished.
      The problem is since each hour is not that big most of the spark job is wasted to create a spark session and kill it at the end. It takes a lot of time to process 3 years of data.



      2- Instead of reading each hour try to read a day or month of data from the source 3://spark-old/production/events/ and try to save it back to the destination with the help of partitionBy("year", "month", "day", "hour"). There are 2 issues with this approach




      1. When you use Dataframe.partitionBy.save it will write the data in a S3 path with a new format. s3://spark-new/production/events/year=2018/month=12/day=20/*

      2. Since we need to path partition ourself partitionBy("year", "month", "day", "hour") which we do base on server_received_time (the closest time as we have with when it get consumed by firehose) still there is no guarantee that the data you read from partition A to be stored in partition B. small percentage of data moved into a new partition.


      3- Try to read a couple of hours (like a batch) in a single Spark job (Spark session) and write them back with parallelism in spark. I am still trying to figure out if this is doable and if it's a good approach or not.



      I want to know what is the best way to do that with all the requirements satisfied.







      scala apache-spark amazon-s3






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Jan 3 at 21:39









      Am1rr3zAAm1rr3zA

      2,317125090




      2,317125090
























          0






          active

          oldest

          votes












          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54030167%2fread-multiple-directory-separately-with-spark-and-write-it-back-in-a-same-struct%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          0






          active

          oldest

          votes








          0






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes
















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54030167%2fread-multiple-directory-separately-with-spark-and-write-it-back-in-a-same-struct%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          2mf 9uTBF,Z,OwO 8r,JoFDuX1mwm nL M6nMS0lsD ZY,Uo,xsN IJOEVa2U,o,Iquzv
          oD1NvFSgCdPVrnZBvshls7,3O,u,BhMLQ1 H438,gKQ1KIqpPCsiBxTOYOR2yZX,G2,r,z,H0,PnQ1ROGpp2vIeOdu g4Em9G,jRWb sp

          Popular posts from this blog

          Monofisismo

          Angular Downloading a file using contenturl with Basic Authentication

          Olmecas