Batch insert to Bigquery with Dataflow





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}







2















I am using apache beam pipeline and I want to batch insert to bigquery with python. My data comes from Pub/Sub which is unbounded. As a result of my research, GlobalWindows with triggers should solve my problem. I tried my pipeline with windowing, but it does still streaming insertion.My pipeline code is the following:



p2 = (p | 'Read ' >> beam.io.ReadFromPubSub(subscription=subscription_path,
with_attributes=True,
timestamp_attribute=None,id_label=None)
| 'Windowing' >> beam.WindowInto(window.GlobalWindows(),
trigger=Repeatedly(
AfterAny(
AfterCount(100),
AfterProcessingTime(1 * 60))),
accumulation_mode=AccumulationMode.DISCARDING)
| 'Process ' >> beam.Map(getAttributes))
p3 = (p2 | 'Filter ' >> beam.Filter(lambda msg: (("xx" in msg) and (msg["xx"].lower() == "true")))
| 'Delete ' >> beam.Map(deleteAttribute)
| 'Write ' >> writeTable(bq_table_test, bq_batch_size))

def writeTable(table_name):
return beam.io.WriteToBigQuery(
table=table_name,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
batch_size=100)


I'm checking from Billing Reports that the inserts are whether batch or stream. When Streming insert usage increases,I understand that the bulk insertion did not happen. Is there another feature that I can check insertion was stream or batch ? And also how can I do batch insert to bigquery ?










share|improve this question





























    2















    I am using apache beam pipeline and I want to batch insert to bigquery with python. My data comes from Pub/Sub which is unbounded. As a result of my research, GlobalWindows with triggers should solve my problem. I tried my pipeline with windowing, but it does still streaming insertion.My pipeline code is the following:



    p2 = (p | 'Read ' >> beam.io.ReadFromPubSub(subscription=subscription_path,
    with_attributes=True,
    timestamp_attribute=None,id_label=None)
    | 'Windowing' >> beam.WindowInto(window.GlobalWindows(),
    trigger=Repeatedly(
    AfterAny(
    AfterCount(100),
    AfterProcessingTime(1 * 60))),
    accumulation_mode=AccumulationMode.DISCARDING)
    | 'Process ' >> beam.Map(getAttributes))
    p3 = (p2 | 'Filter ' >> beam.Filter(lambda msg: (("xx" in msg) and (msg["xx"].lower() == "true")))
    | 'Delete ' >> beam.Map(deleteAttribute)
    | 'Write ' >> writeTable(bq_table_test, bq_batch_size))

    def writeTable(table_name):
    return beam.io.WriteToBigQuery(
    table=table_name,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
    batch_size=100)


    I'm checking from Billing Reports that the inserts are whether batch or stream. When Streming insert usage increases,I understand that the bulk insertion did not happen. Is there another feature that I can check insertion was stream or batch ? And also how can I do batch insert to bigquery ?










    share|improve this question

























      2












      2








      2








      I am using apache beam pipeline and I want to batch insert to bigquery with python. My data comes from Pub/Sub which is unbounded. As a result of my research, GlobalWindows with triggers should solve my problem. I tried my pipeline with windowing, but it does still streaming insertion.My pipeline code is the following:



      p2 = (p | 'Read ' >> beam.io.ReadFromPubSub(subscription=subscription_path,
      with_attributes=True,
      timestamp_attribute=None,id_label=None)
      | 'Windowing' >> beam.WindowInto(window.GlobalWindows(),
      trigger=Repeatedly(
      AfterAny(
      AfterCount(100),
      AfterProcessingTime(1 * 60))),
      accumulation_mode=AccumulationMode.DISCARDING)
      | 'Process ' >> beam.Map(getAttributes))
      p3 = (p2 | 'Filter ' >> beam.Filter(lambda msg: (("xx" in msg) and (msg["xx"].lower() == "true")))
      | 'Delete ' >> beam.Map(deleteAttribute)
      | 'Write ' >> writeTable(bq_table_test, bq_batch_size))

      def writeTable(table_name):
      return beam.io.WriteToBigQuery(
      table=table_name,
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
      batch_size=100)


      I'm checking from Billing Reports that the inserts are whether batch or stream. When Streming insert usage increases,I understand that the bulk insertion did not happen. Is there another feature that I can check insertion was stream or batch ? And also how can I do batch insert to bigquery ?










      share|improve this question














      I am using apache beam pipeline and I want to batch insert to bigquery with python. My data comes from Pub/Sub which is unbounded. As a result of my research, GlobalWindows with triggers should solve my problem. I tried my pipeline with windowing, but it does still streaming insertion.My pipeline code is the following:



      p2 = (p | 'Read ' >> beam.io.ReadFromPubSub(subscription=subscription_path,
      with_attributes=True,
      timestamp_attribute=None,id_label=None)
      | 'Windowing' >> beam.WindowInto(window.GlobalWindows(),
      trigger=Repeatedly(
      AfterAny(
      AfterCount(100),
      AfterProcessingTime(1 * 60))),
      accumulation_mode=AccumulationMode.DISCARDING)
      | 'Process ' >> beam.Map(getAttributes))
      p3 = (p2 | 'Filter ' >> beam.Filter(lambda msg: (("xx" in msg) and (msg["xx"].lower() == "true")))
      | 'Delete ' >> beam.Map(deleteAttribute)
      | 'Write ' >> writeTable(bq_table_test, bq_batch_size))

      def writeTable(table_name):
      return beam.io.WriteToBigQuery(
      table=table_name,
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
      batch_size=100)


      I'm checking from Billing Reports that the inserts are whether batch or stream. When Streming insert usage increases,I understand that the bulk insertion did not happen. Is there another feature that I can check insertion was stream or batch ? And also how can I do batch insert to bigquery ?







      python google-cloud-platform google-bigquery google-cloud-dataflow apache-beam






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Jan 4 at 10:41









      eebruueebruu

      545




      545
























          1 Answer
          1






          active

          oldest

          votes


















          1














          According to the documentation you cannot specify the insert type, it is automatically detected based on your input PCollection:




          The Beam SDK for Python does not currently support specifying the
          insertion method.



          BigQueryIO supports two methods of inserting data into BigQuery: load
          jobs and streaming inserts. Each insertion method provides different
          tradeoffs of cost, quota, and data consistency. See the BigQuery
          documentation for load jobs and streaming inserts for more information
          about these tradeoffs.



          BigQueryIO chooses a default insertion method based on the input
          PCollection.



          BigQueryIO uses load jobs when you apply a BigQueryIO write transform
          to a bounded PCollection.



          BigQueryIO uses streaming inserts when you apply a BigQueryIO write
          transform to an unbounded PCollection.




          In your case you're reading from an unbounded source (Pubsub) so it is always streaming writes in this case. Windowing will not change the nature of the data.



          One workaround I can think of is to split the pipeline, e.g. a streaming pipeline would write to a collection of files at some storage (GCS) and then another pipeline would read and upload those files (the files are bounded).






          share|improve this answer
























          • Is there any method to convert an unbounded PCollection to bounded PCollection on the way?

            – Onur
            Jan 7 at 5:29











          • Batch insert to Bigquery with Dataflow is available for Java stackoverflow.com/questions/50843584 and pending implementation for Python issuetracker.google.com/122902300

            – MonicaPC
            Jan 16 at 17:32











          • Update: Looks like the 2.12.0 release candidate for Apache Beam includes a way to control the write method on WriteToBigQuery. I'm not sure what'll actually happen when you try to force FILE_LOADS write method on a streaming Pub/Sub pipeline, but at least you can experiment now! github.com/apache/beam/blob/v2.12.0-RC2/sdks/python/apache_beam/…

            – Ryan Schuster
            Apr 9 at 17:59












          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54037337%2fbatch-insert-to-bigquery-with-dataflow%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          1














          According to the documentation you cannot specify the insert type, it is automatically detected based on your input PCollection:




          The Beam SDK for Python does not currently support specifying the
          insertion method.



          BigQueryIO supports two methods of inserting data into BigQuery: load
          jobs and streaming inserts. Each insertion method provides different
          tradeoffs of cost, quota, and data consistency. See the BigQuery
          documentation for load jobs and streaming inserts for more information
          about these tradeoffs.



          BigQueryIO chooses a default insertion method based on the input
          PCollection.



          BigQueryIO uses load jobs when you apply a BigQueryIO write transform
          to a bounded PCollection.



          BigQueryIO uses streaming inserts when you apply a BigQueryIO write
          transform to an unbounded PCollection.




          In your case you're reading from an unbounded source (Pubsub) so it is always streaming writes in this case. Windowing will not change the nature of the data.



          One workaround I can think of is to split the pipeline, e.g. a streaming pipeline would write to a collection of files at some storage (GCS) and then another pipeline would read and upload those files (the files are bounded).






          share|improve this answer
























          • Is there any method to convert an unbounded PCollection to bounded PCollection on the way?

            – Onur
            Jan 7 at 5:29











          • Batch insert to Bigquery with Dataflow is available for Java stackoverflow.com/questions/50843584 and pending implementation for Python issuetracker.google.com/122902300

            – MonicaPC
            Jan 16 at 17:32











          • Update: Looks like the 2.12.0 release candidate for Apache Beam includes a way to control the write method on WriteToBigQuery. I'm not sure what'll actually happen when you try to force FILE_LOADS write method on a streaming Pub/Sub pipeline, but at least you can experiment now! github.com/apache/beam/blob/v2.12.0-RC2/sdks/python/apache_beam/…

            – Ryan Schuster
            Apr 9 at 17:59
















          1














          According to the documentation you cannot specify the insert type, it is automatically detected based on your input PCollection:




          The Beam SDK for Python does not currently support specifying the
          insertion method.



          BigQueryIO supports two methods of inserting data into BigQuery: load
          jobs and streaming inserts. Each insertion method provides different
          tradeoffs of cost, quota, and data consistency. See the BigQuery
          documentation for load jobs and streaming inserts for more information
          about these tradeoffs.



          BigQueryIO chooses a default insertion method based on the input
          PCollection.



          BigQueryIO uses load jobs when you apply a BigQueryIO write transform
          to a bounded PCollection.



          BigQueryIO uses streaming inserts when you apply a BigQueryIO write
          transform to an unbounded PCollection.




          In your case you're reading from an unbounded source (Pubsub) so it is always streaming writes in this case. Windowing will not change the nature of the data.



          One workaround I can think of is to split the pipeline, e.g. a streaming pipeline would write to a collection of files at some storage (GCS) and then another pipeline would read and upload those files (the files are bounded).






          share|improve this answer
























          • Is there any method to convert an unbounded PCollection to bounded PCollection on the way?

            – Onur
            Jan 7 at 5:29











          • Batch insert to Bigquery with Dataflow is available for Java stackoverflow.com/questions/50843584 and pending implementation for Python issuetracker.google.com/122902300

            – MonicaPC
            Jan 16 at 17:32











          • Update: Looks like the 2.12.0 release candidate for Apache Beam includes a way to control the write method on WriteToBigQuery. I'm not sure what'll actually happen when you try to force FILE_LOADS write method on a streaming Pub/Sub pipeline, but at least you can experiment now! github.com/apache/beam/blob/v2.12.0-RC2/sdks/python/apache_beam/…

            – Ryan Schuster
            Apr 9 at 17:59














          1












          1








          1







          According to the documentation you cannot specify the insert type, it is automatically detected based on your input PCollection:




          The Beam SDK for Python does not currently support specifying the
          insertion method.



          BigQueryIO supports two methods of inserting data into BigQuery: load
          jobs and streaming inserts. Each insertion method provides different
          tradeoffs of cost, quota, and data consistency. See the BigQuery
          documentation for load jobs and streaming inserts for more information
          about these tradeoffs.



          BigQueryIO chooses a default insertion method based on the input
          PCollection.



          BigQueryIO uses load jobs when you apply a BigQueryIO write transform
          to a bounded PCollection.



          BigQueryIO uses streaming inserts when you apply a BigQueryIO write
          transform to an unbounded PCollection.




          In your case you're reading from an unbounded source (Pubsub) so it is always streaming writes in this case. Windowing will not change the nature of the data.



          One workaround I can think of is to split the pipeline, e.g. a streaming pipeline would write to a collection of files at some storage (GCS) and then another pipeline would read and upload those files (the files are bounded).






          share|improve this answer













          According to the documentation you cannot specify the insert type, it is automatically detected based on your input PCollection:




          The Beam SDK for Python does not currently support specifying the
          insertion method.



          BigQueryIO supports two methods of inserting data into BigQuery: load
          jobs and streaming inserts. Each insertion method provides different
          tradeoffs of cost, quota, and data consistency. See the BigQuery
          documentation for load jobs and streaming inserts for more information
          about these tradeoffs.



          BigQueryIO chooses a default insertion method based on the input
          PCollection.



          BigQueryIO uses load jobs when you apply a BigQueryIO write transform
          to a bounded PCollection.



          BigQueryIO uses streaming inserts when you apply a BigQueryIO write
          transform to an unbounded PCollection.




          In your case you're reading from an unbounded source (Pubsub) so it is always streaming writes in this case. Windowing will not change the nature of the data.



          One workaround I can think of is to split the pipeline, e.g. a streaming pipeline would write to a collection of files at some storage (GCS) and then another pipeline would read and upload those files (the files are bounded).







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Jan 4 at 16:23









          AntonAnton

          1,187216




          1,187216













          • Is there any method to convert an unbounded PCollection to bounded PCollection on the way?

            – Onur
            Jan 7 at 5:29











          • Batch insert to Bigquery with Dataflow is available for Java stackoverflow.com/questions/50843584 and pending implementation for Python issuetracker.google.com/122902300

            – MonicaPC
            Jan 16 at 17:32











          • Update: Looks like the 2.12.0 release candidate for Apache Beam includes a way to control the write method on WriteToBigQuery. I'm not sure what'll actually happen when you try to force FILE_LOADS write method on a streaming Pub/Sub pipeline, but at least you can experiment now! github.com/apache/beam/blob/v2.12.0-RC2/sdks/python/apache_beam/…

            – Ryan Schuster
            Apr 9 at 17:59



















          • Is there any method to convert an unbounded PCollection to bounded PCollection on the way?

            – Onur
            Jan 7 at 5:29











          • Batch insert to Bigquery with Dataflow is available for Java stackoverflow.com/questions/50843584 and pending implementation for Python issuetracker.google.com/122902300

            – MonicaPC
            Jan 16 at 17:32











          • Update: Looks like the 2.12.0 release candidate for Apache Beam includes a way to control the write method on WriteToBigQuery. I'm not sure what'll actually happen when you try to force FILE_LOADS write method on a streaming Pub/Sub pipeline, but at least you can experiment now! github.com/apache/beam/blob/v2.12.0-RC2/sdks/python/apache_beam/…

            – Ryan Schuster
            Apr 9 at 17:59

















          Is there any method to convert an unbounded PCollection to bounded PCollection on the way?

          – Onur
          Jan 7 at 5:29





          Is there any method to convert an unbounded PCollection to bounded PCollection on the way?

          – Onur
          Jan 7 at 5:29













          Batch insert to Bigquery with Dataflow is available for Java stackoverflow.com/questions/50843584 and pending implementation for Python issuetracker.google.com/122902300

          – MonicaPC
          Jan 16 at 17:32





          Batch insert to Bigquery with Dataflow is available for Java stackoverflow.com/questions/50843584 and pending implementation for Python issuetracker.google.com/122902300

          – MonicaPC
          Jan 16 at 17:32













          Update: Looks like the 2.12.0 release candidate for Apache Beam includes a way to control the write method on WriteToBigQuery. I'm not sure what'll actually happen when you try to force FILE_LOADS write method on a streaming Pub/Sub pipeline, but at least you can experiment now! github.com/apache/beam/blob/v2.12.0-RC2/sdks/python/apache_beam/…

          – Ryan Schuster
          Apr 9 at 17:59





          Update: Looks like the 2.12.0 release candidate for Apache Beam includes a way to control the write method on WriteToBigQuery. I'm not sure what'll actually happen when you try to force FILE_LOADS write method on a streaming Pub/Sub pipeline, but at least you can experiment now! github.com/apache/beam/blob/v2.12.0-RC2/sdks/python/apache_beam/…

          – Ryan Schuster
          Apr 9 at 17:59




















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54037337%2fbatch-insert-to-bigquery-with-dataflow%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Monofisismo

          Angular Downloading a file using contenturl with Basic Authentication

          Olmecas