Beam Pipeline (PY) Output not getting written to localhost disk
I'm learning Windowing & Triggering concepts in Apache with the intention to:
- reads for unbounded sources (PubSub)
- write the incoming message to localhost disk every 5 sec FIXED Window interval
ISSUE: no output getting written to localhost disk (the pipeline did create a beam-team- folder and wrote some files in there but no output.csv in intended destination getting written every 5 sec.)
- running apache-beam==2.9.0, Python 2.7.10
- Tried both: DirectRunner, as well as DataFlowRunner (with GCS Bucket as destination)
Here's the code (thanks a lot in advance for any advice):
p = beam.Pipeline(runner=None, options=options, argv=None)
"""
#1) Read incoming messages & apply Windowing
"""
lines = p | "read_sub" >> beam.io.gcp.pubsub.ReadFromPubSub(topic=None, subscription=SUBSCRIBER, with_attributes=True)
"""
#2) Apply 5 sec Windowing
"""
| 'window' >> beam.WindowInto(beam.window.FixedWindows(5))
"""
#3) apply Map() ops
"""
output = lines | "pardo" >> beam.Map(lambda x: x.data)
"""
#4) write out to localhost disk
"""
output | beam.io.WriteToText('output', file_name_suffix='.csv', header='time, colname1, colname2')
p.run().wait_until_finish()
thanks a lot in advance for any advice!
Cheers!
python-2.7 google-cloud-dataflow apache-beam
|
show 2 more comments
I'm learning Windowing & Triggering concepts in Apache with the intention to:
- reads for unbounded sources (PubSub)
- write the incoming message to localhost disk every 5 sec FIXED Window interval
ISSUE: no output getting written to localhost disk (the pipeline did create a beam-team- folder and wrote some files in there but no output.csv in intended destination getting written every 5 sec.)
- running apache-beam==2.9.0, Python 2.7.10
- Tried both: DirectRunner, as well as DataFlowRunner (with GCS Bucket as destination)
Here's the code (thanks a lot in advance for any advice):
p = beam.Pipeline(runner=None, options=options, argv=None)
"""
#1) Read incoming messages & apply Windowing
"""
lines = p | "read_sub" >> beam.io.gcp.pubsub.ReadFromPubSub(topic=None, subscription=SUBSCRIBER, with_attributes=True)
"""
#2) Apply 5 sec Windowing
"""
| 'window' >> beam.WindowInto(beam.window.FixedWindows(5))
"""
#3) apply Map() ops
"""
output = lines | "pardo" >> beam.Map(lambda x: x.data)
"""
#4) write out to localhost disk
"""
output | beam.io.WriteToText('output', file_name_suffix='.csv', header='time, colname1, colname2')
p.run().wait_until_finish()
thanks a lot in advance for any advice!
Cheers!
python-2.7 google-cloud-dataflow apache-beam
When running with the direct runner, can you check the output in the console? Is there any relevant information there? Any errors?
– Anton
Jan 2 at 23:52
no, nothing. only the deprecation warning: /Applications/py_venvs/dataflow_py2/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py:365: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
– Vibhor Jain
Jan 2 at 23:54
Have you checked that there are no sharded files? something like output-0000-of-0010.csv ?
– Pablo
Jan 3 at 17:55
I checked already.. DIrectRunner only created a beam-temp-* directory where expected output files are dumped (temporarily?!).. but nothing gets output in expected output directory (python pipeline.py --runner DirectRunner --input input.csv --output output --streaming)
– Vibhor Jain
Jan 3 at 18:37
similar behaviour for Dataflow Runner as well, except for: it outputs sharded files but still in beam-temp-* directory.. nothing output in gs:// bucket location I passed as argument to beam.io.WriteToText()
– Vibhor Jain
Jan 3 at 18:45
|
show 2 more comments
I'm learning Windowing & Triggering concepts in Apache with the intention to:
- reads for unbounded sources (PubSub)
- write the incoming message to localhost disk every 5 sec FIXED Window interval
ISSUE: no output getting written to localhost disk (the pipeline did create a beam-team- folder and wrote some files in there but no output.csv in intended destination getting written every 5 sec.)
- running apache-beam==2.9.0, Python 2.7.10
- Tried both: DirectRunner, as well as DataFlowRunner (with GCS Bucket as destination)
Here's the code (thanks a lot in advance for any advice):
p = beam.Pipeline(runner=None, options=options, argv=None)
"""
#1) Read incoming messages & apply Windowing
"""
lines = p | "read_sub" >> beam.io.gcp.pubsub.ReadFromPubSub(topic=None, subscription=SUBSCRIBER, with_attributes=True)
"""
#2) Apply 5 sec Windowing
"""
| 'window' >> beam.WindowInto(beam.window.FixedWindows(5))
"""
#3) apply Map() ops
"""
output = lines | "pardo" >> beam.Map(lambda x: x.data)
"""
#4) write out to localhost disk
"""
output | beam.io.WriteToText('output', file_name_suffix='.csv', header='time, colname1, colname2')
p.run().wait_until_finish()
thanks a lot in advance for any advice!
Cheers!
python-2.7 google-cloud-dataflow apache-beam
I'm learning Windowing & Triggering concepts in Apache with the intention to:
- reads for unbounded sources (PubSub)
- write the incoming message to localhost disk every 5 sec FIXED Window interval
ISSUE: no output getting written to localhost disk (the pipeline did create a beam-team- folder and wrote some files in there but no output.csv in intended destination getting written every 5 sec.)
- running apache-beam==2.9.0, Python 2.7.10
- Tried both: DirectRunner, as well as DataFlowRunner (with GCS Bucket as destination)
Here's the code (thanks a lot in advance for any advice):
p = beam.Pipeline(runner=None, options=options, argv=None)
"""
#1) Read incoming messages & apply Windowing
"""
lines = p | "read_sub" >> beam.io.gcp.pubsub.ReadFromPubSub(topic=None, subscription=SUBSCRIBER, with_attributes=True)
"""
#2) Apply 5 sec Windowing
"""
| 'window' >> beam.WindowInto(beam.window.FixedWindows(5))
"""
#3) apply Map() ops
"""
output = lines | "pardo" >> beam.Map(lambda x: x.data)
"""
#4) write out to localhost disk
"""
output | beam.io.WriteToText('output', file_name_suffix='.csv', header='time, colname1, colname2')
p.run().wait_until_finish()
thanks a lot in advance for any advice!
Cheers!
python-2.7 google-cloud-dataflow apache-beam
python-2.7 google-cloud-dataflow apache-beam
edited Jan 2 at 18:42
Vibhor Jain
asked Jan 2 at 18:21
Vibhor JainVibhor Jain
460313
460313
When running with the direct runner, can you check the output in the console? Is there any relevant information there? Any errors?
– Anton
Jan 2 at 23:52
no, nothing. only the deprecation warning: /Applications/py_venvs/dataflow_py2/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py:365: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
– Vibhor Jain
Jan 2 at 23:54
Have you checked that there are no sharded files? something like output-0000-of-0010.csv ?
– Pablo
Jan 3 at 17:55
I checked already.. DIrectRunner only created a beam-temp-* directory where expected output files are dumped (temporarily?!).. but nothing gets output in expected output directory (python pipeline.py --runner DirectRunner --input input.csv --output output --streaming)
– Vibhor Jain
Jan 3 at 18:37
similar behaviour for Dataflow Runner as well, except for: it outputs sharded files but still in beam-temp-* directory.. nothing output in gs:// bucket location I passed as argument to beam.io.WriteToText()
– Vibhor Jain
Jan 3 at 18:45
|
show 2 more comments
When running with the direct runner, can you check the output in the console? Is there any relevant information there? Any errors?
– Anton
Jan 2 at 23:52
no, nothing. only the deprecation warning: /Applications/py_venvs/dataflow_py2/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py:365: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
– Vibhor Jain
Jan 2 at 23:54
Have you checked that there are no sharded files? something like output-0000-of-0010.csv ?
– Pablo
Jan 3 at 17:55
I checked already.. DIrectRunner only created a beam-temp-* directory where expected output files are dumped (temporarily?!).. but nothing gets output in expected output directory (python pipeline.py --runner DirectRunner --input input.csv --output output --streaming)
– Vibhor Jain
Jan 3 at 18:37
similar behaviour for Dataflow Runner as well, except for: it outputs sharded files but still in beam-temp-* directory.. nothing output in gs:// bucket location I passed as argument to beam.io.WriteToText()
– Vibhor Jain
Jan 3 at 18:45
When running with the direct runner, can you check the output in the console? Is there any relevant information there? Any errors?
– Anton
Jan 2 at 23:52
When running with the direct runner, can you check the output in the console? Is there any relevant information there? Any errors?
– Anton
Jan 2 at 23:52
no, nothing. only the deprecation warning: /Applications/py_venvs/dataflow_py2/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py:365: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
– Vibhor Jain
Jan 2 at 23:54
no, nothing. only the deprecation warning: /Applications/py_venvs/dataflow_py2/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py:365: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
– Vibhor Jain
Jan 2 at 23:54
Have you checked that there are no sharded files? something like output-0000-of-0010.csv ?
– Pablo
Jan 3 at 17:55
Have you checked that there are no sharded files? something like output-0000-of-0010.csv ?
– Pablo
Jan 3 at 17:55
I checked already.. DIrectRunner only created a beam-temp-* directory where expected output files are dumped (temporarily?!).. but nothing gets output in expected output directory (python pipeline.py --runner DirectRunner --input input.csv --output output --streaming)
– Vibhor Jain
Jan 3 at 18:37
I checked already.. DIrectRunner only created a beam-temp-* directory where expected output files are dumped (temporarily?!).. but nothing gets output in expected output directory (python pipeline.py --runner DirectRunner --input input.csv --output output --streaming)
– Vibhor Jain
Jan 3 at 18:37
similar behaviour for Dataflow Runner as well, except for: it outputs sharded files but still in beam-temp-* directory.. nothing output in gs:// bucket location I passed as argument to beam.io.WriteToText()
– Vibhor Jain
Jan 3 at 18:45
similar behaviour for Dataflow Runner as well, except for: it outputs sharded files but still in beam-temp-* directory.. nothing output in gs:// bucket location I passed as argument to beam.io.WriteToText()
– Vibhor Jain
Jan 3 at 18:45
|
show 2 more comments
1 Answer
1
active
oldest
votes
You are reading from unbounded source and trying to write to a bounded source. While Beam API for Java supports it using withWindowedWrites method, but its not supported in Python yet, which is a long awaited useful feature. So you need to either switch to java or write it to BigQuery.
won't applying Trggers AfterWatermark() solve? as per my understanding it should emit the result at each Window close and output to text file...
– Vibhor Jain
Jan 17 at 2:49
Unfortunately no. If you modify your write sink to pub/sub or bigquery, you will find that the triggering is working perfectly. Its just that python API does not support write to unbounded destination yet
– Tanveer Uddin
Jan 17 at 23:34
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54011286%2fbeam-pipeline-py-output-not-getting-written-to-localhost-disk%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
You are reading from unbounded source and trying to write to a bounded source. While Beam API for Java supports it using withWindowedWrites method, but its not supported in Python yet, which is a long awaited useful feature. So you need to either switch to java or write it to BigQuery.
won't applying Trggers AfterWatermark() solve? as per my understanding it should emit the result at each Window close and output to text file...
– Vibhor Jain
Jan 17 at 2:49
Unfortunately no. If you modify your write sink to pub/sub or bigquery, you will find that the triggering is working perfectly. Its just that python API does not support write to unbounded destination yet
– Tanveer Uddin
Jan 17 at 23:34
add a comment |
You are reading from unbounded source and trying to write to a bounded source. While Beam API for Java supports it using withWindowedWrites method, but its not supported in Python yet, which is a long awaited useful feature. So you need to either switch to java or write it to BigQuery.
won't applying Trggers AfterWatermark() solve? as per my understanding it should emit the result at each Window close and output to text file...
– Vibhor Jain
Jan 17 at 2:49
Unfortunately no. If you modify your write sink to pub/sub or bigquery, you will find that the triggering is working perfectly. Its just that python API does not support write to unbounded destination yet
– Tanveer Uddin
Jan 17 at 23:34
add a comment |
You are reading from unbounded source and trying to write to a bounded source. While Beam API for Java supports it using withWindowedWrites method, but its not supported in Python yet, which is a long awaited useful feature. So you need to either switch to java or write it to BigQuery.
You are reading from unbounded source and trying to write to a bounded source. While Beam API for Java supports it using withWindowedWrites method, but its not supported in Python yet, which is a long awaited useful feature. So you need to either switch to java or write it to BigQuery.
answered Jan 15 at 21:39
Tanveer UddinTanveer Uddin
616210
616210
won't applying Trggers AfterWatermark() solve? as per my understanding it should emit the result at each Window close and output to text file...
– Vibhor Jain
Jan 17 at 2:49
Unfortunately no. If you modify your write sink to pub/sub or bigquery, you will find that the triggering is working perfectly. Its just that python API does not support write to unbounded destination yet
– Tanveer Uddin
Jan 17 at 23:34
add a comment |
won't applying Trggers AfterWatermark() solve? as per my understanding it should emit the result at each Window close and output to text file...
– Vibhor Jain
Jan 17 at 2:49
Unfortunately no. If you modify your write sink to pub/sub or bigquery, you will find that the triggering is working perfectly. Its just that python API does not support write to unbounded destination yet
– Tanveer Uddin
Jan 17 at 23:34
won't applying Trggers AfterWatermark() solve? as per my understanding it should emit the result at each Window close and output to text file...
– Vibhor Jain
Jan 17 at 2:49
won't applying Trggers AfterWatermark() solve? as per my understanding it should emit the result at each Window close and output to text file...
– Vibhor Jain
Jan 17 at 2:49
Unfortunately no. If you modify your write sink to pub/sub or bigquery, you will find that the triggering is working perfectly. Its just that python API does not support write to unbounded destination yet
– Tanveer Uddin
Jan 17 at 23:34
Unfortunately no. If you modify your write sink to pub/sub or bigquery, you will find that the triggering is working perfectly. Its just that python API does not support write to unbounded destination yet
– Tanveer Uddin
Jan 17 at 23:34
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54011286%2fbeam-pipeline-py-output-not-getting-written-to-localhost-disk%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
When running with the direct runner, can you check the output in the console? Is there any relevant information there? Any errors?
– Anton
Jan 2 at 23:52
no, nothing. only the deprecation warning: /Applications/py_venvs/dataflow_py2/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py:365: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
– Vibhor Jain
Jan 2 at 23:54
Have you checked that there are no sharded files? something like output-0000-of-0010.csv ?
– Pablo
Jan 3 at 17:55
I checked already.. DIrectRunner only created a beam-temp-* directory where expected output files are dumped (temporarily?!).. but nothing gets output in expected output directory (python pipeline.py --runner DirectRunner --input input.csv --output output --streaming)
– Vibhor Jain
Jan 3 at 18:37
similar behaviour for Dataflow Runner as well, except for: it outputs sharded files but still in beam-temp-* directory.. nothing output in gs:// bucket location I passed as argument to beam.io.WriteToText()
– Vibhor Jain
Jan 3 at 18:45