Why is my PySpark dataframe join operation writing an empty result?

Multi tool use
I have two PySpark dataframes that I'm trying to join into a new dataframe. The join operation seems to show a null dataframe.
I'm using Jupyter notebooks to evaluate the code, on a PySpark kernel, on a cluster with a single master, 4 workers, YARN for resource allocation.
from pyspark.sql.functions import monotonically_increasing_id,udf
from pyspark.sql.types import FloatType
from pyspark.mllib.linalg import DenseVector
firstelement=udf(lambda v:float(v[1]),FloatType())
a = [{'c_id': 'a', 'cv_id': 'b', 'id': 1}, {'c_id': 'c', 'cv_id': 'd', 'id': 2}]
ip = spark.createDataFrame(a)
b = [{'probability': DenseVector([0.99,0.01]), 'id': 1}, {'probability': DenseVector([0.6,0.4]), 'id': 2}]
op = spark.createDataFrame(b)
op.show() #shows the df
#probability, id
#[0.99, 0.01], 1
##probability is a dense vector, id is bigint
ip.show() #shows the df
#c_id, cv_id, id
#a,b,1
##c_id and cv_id are strings, id is bigint
op_final = op.join(ip, ip.id == op.id).select('c_id','cv_id',firstelement('probability')).withColumnRenamed('<lambda>(probability)','probability')
op_final.show() #gives a null df
#but the below seems to work, however, quite slow
ip.collect()
op.collect()
op_final.collect()
op_final.show() #shows the joined df
Perhaps it's my lack of expertise with Spark, but could someone please explain why I'm able to see the first two dataframes, but not the joined dataframe unless I use collect()?
python apache-spark pyspark
|
show 2 more comments
I have two PySpark dataframes that I'm trying to join into a new dataframe. The join operation seems to show a null dataframe.
I'm using Jupyter notebooks to evaluate the code, on a PySpark kernel, on a cluster with a single master, 4 workers, YARN for resource allocation.
from pyspark.sql.functions import monotonically_increasing_id,udf
from pyspark.sql.types import FloatType
from pyspark.mllib.linalg import DenseVector
firstelement=udf(lambda v:float(v[1]),FloatType())
a = [{'c_id': 'a', 'cv_id': 'b', 'id': 1}, {'c_id': 'c', 'cv_id': 'd', 'id': 2}]
ip = spark.createDataFrame(a)
b = [{'probability': DenseVector([0.99,0.01]), 'id': 1}, {'probability': DenseVector([0.6,0.4]), 'id': 2}]
op = spark.createDataFrame(b)
op.show() #shows the df
#probability, id
#[0.99, 0.01], 1
##probability is a dense vector, id is bigint
ip.show() #shows the df
#c_id, cv_id, id
#a,b,1
##c_id and cv_id are strings, id is bigint
op_final = op.join(ip, ip.id == op.id).select('c_id','cv_id',firstelement('probability')).withColumnRenamed('<lambda>(probability)','probability')
op_final.show() #gives a null df
#but the below seems to work, however, quite slow
ip.collect()
op.collect()
op_final.collect()
op_final.show() #shows the joined df
Perhaps it's my lack of expertise with Spark, but could someone please explain why I'm able to see the first two dataframes, but not the joined dataframe unless I use collect()?
python apache-spark pyspark
its a udf, its defined in row 3: firstelement=udf(lambda v:float(v[1]),FloatType())
– lejin
Dec 27 '18 at 16:38
Sorry, I missed that. You can also dofirstelement('probability').alias("probability")
and avoid thewithColumnRenamed
call after.
– pault
Dec 27 '18 at 16:39
ok, cool, I'll try that to optimize my code. any thoughts on the issue, though?
– lejin
Dec 27 '18 at 16:41
Sorry, I can't recreate your issue.Your code works fine for me. I tried bothpyspark.ml.linalg.DenseVector
andpyspark.mllib.linalg.DenseVector
. Can you add in some code to recreate your example DataFrames?
– pault
Dec 27 '18 at 16:46
@pault I have thought if it has something to do with YARN's memory allocation, since I have observed this sometimes work intermittently. But I haven't been able to replicate the exact circumstances under which it works and it doesn't.
– lejin
Dec 27 '18 at 16:50
|
show 2 more comments
I have two PySpark dataframes that I'm trying to join into a new dataframe. The join operation seems to show a null dataframe.
I'm using Jupyter notebooks to evaluate the code, on a PySpark kernel, on a cluster with a single master, 4 workers, YARN for resource allocation.
from pyspark.sql.functions import monotonically_increasing_id,udf
from pyspark.sql.types import FloatType
from pyspark.mllib.linalg import DenseVector
firstelement=udf(lambda v:float(v[1]),FloatType())
a = [{'c_id': 'a', 'cv_id': 'b', 'id': 1}, {'c_id': 'c', 'cv_id': 'd', 'id': 2}]
ip = spark.createDataFrame(a)
b = [{'probability': DenseVector([0.99,0.01]), 'id': 1}, {'probability': DenseVector([0.6,0.4]), 'id': 2}]
op = spark.createDataFrame(b)
op.show() #shows the df
#probability, id
#[0.99, 0.01], 1
##probability is a dense vector, id is bigint
ip.show() #shows the df
#c_id, cv_id, id
#a,b,1
##c_id and cv_id are strings, id is bigint
op_final = op.join(ip, ip.id == op.id).select('c_id','cv_id',firstelement('probability')).withColumnRenamed('<lambda>(probability)','probability')
op_final.show() #gives a null df
#but the below seems to work, however, quite slow
ip.collect()
op.collect()
op_final.collect()
op_final.show() #shows the joined df
Perhaps it's my lack of expertise with Spark, but could someone please explain why I'm able to see the first two dataframes, but not the joined dataframe unless I use collect()?
python apache-spark pyspark
I have two PySpark dataframes that I'm trying to join into a new dataframe. The join operation seems to show a null dataframe.
I'm using Jupyter notebooks to evaluate the code, on a PySpark kernel, on a cluster with a single master, 4 workers, YARN for resource allocation.
from pyspark.sql.functions import monotonically_increasing_id,udf
from pyspark.sql.types import FloatType
from pyspark.mllib.linalg import DenseVector
firstelement=udf(lambda v:float(v[1]),FloatType())
a = [{'c_id': 'a', 'cv_id': 'b', 'id': 1}, {'c_id': 'c', 'cv_id': 'd', 'id': 2}]
ip = spark.createDataFrame(a)
b = [{'probability': DenseVector([0.99,0.01]), 'id': 1}, {'probability': DenseVector([0.6,0.4]), 'id': 2}]
op = spark.createDataFrame(b)
op.show() #shows the df
#probability, id
#[0.99, 0.01], 1
##probability is a dense vector, id is bigint
ip.show() #shows the df
#c_id, cv_id, id
#a,b,1
##c_id and cv_id are strings, id is bigint
op_final = op.join(ip, ip.id == op.id).select('c_id','cv_id',firstelement('probability')).withColumnRenamed('<lambda>(probability)','probability')
op_final.show() #gives a null df
#but the below seems to work, however, quite slow
ip.collect()
op.collect()
op_final.collect()
op_final.show() #shows the joined df
Perhaps it's my lack of expertise with Spark, but could someone please explain why I'm able to see the first two dataframes, but not the joined dataframe unless I use collect()?
python apache-spark pyspark
python apache-spark pyspark
edited Dec 27 '18 at 17:04
asked Dec 27 '18 at 16:31
lejin
133
133
its a udf, its defined in row 3: firstelement=udf(lambda v:float(v[1]),FloatType())
– lejin
Dec 27 '18 at 16:38
Sorry, I missed that. You can also dofirstelement('probability').alias("probability")
and avoid thewithColumnRenamed
call after.
– pault
Dec 27 '18 at 16:39
ok, cool, I'll try that to optimize my code. any thoughts on the issue, though?
– lejin
Dec 27 '18 at 16:41
Sorry, I can't recreate your issue.Your code works fine for me. I tried bothpyspark.ml.linalg.DenseVector
andpyspark.mllib.linalg.DenseVector
. Can you add in some code to recreate your example DataFrames?
– pault
Dec 27 '18 at 16:46
@pault I have thought if it has something to do with YARN's memory allocation, since I have observed this sometimes work intermittently. But I haven't been able to replicate the exact circumstances under which it works and it doesn't.
– lejin
Dec 27 '18 at 16:50
|
show 2 more comments
its a udf, its defined in row 3: firstelement=udf(lambda v:float(v[1]),FloatType())
– lejin
Dec 27 '18 at 16:38
Sorry, I missed that. You can also dofirstelement('probability').alias("probability")
and avoid thewithColumnRenamed
call after.
– pault
Dec 27 '18 at 16:39
ok, cool, I'll try that to optimize my code. any thoughts on the issue, though?
– lejin
Dec 27 '18 at 16:41
Sorry, I can't recreate your issue.Your code works fine for me. I tried bothpyspark.ml.linalg.DenseVector
andpyspark.mllib.linalg.DenseVector
. Can you add in some code to recreate your example DataFrames?
– pault
Dec 27 '18 at 16:46
@pault I have thought if it has something to do with YARN's memory allocation, since I have observed this sometimes work intermittently. But I haven't been able to replicate the exact circumstances under which it works and it doesn't.
– lejin
Dec 27 '18 at 16:50
its a udf, its defined in row 3: firstelement=udf(lambda v:float(v[1]),FloatType())
– lejin
Dec 27 '18 at 16:38
its a udf, its defined in row 3: firstelement=udf(lambda v:float(v[1]),FloatType())
– lejin
Dec 27 '18 at 16:38
Sorry, I missed that. You can also do
firstelement('probability').alias("probability")
and avoid the withColumnRenamed
call after.– pault
Dec 27 '18 at 16:39
Sorry, I missed that. You can also do
firstelement('probability').alias("probability")
and avoid the withColumnRenamed
call after.– pault
Dec 27 '18 at 16:39
ok, cool, I'll try that to optimize my code. any thoughts on the issue, though?
– lejin
Dec 27 '18 at 16:41
ok, cool, I'll try that to optimize my code. any thoughts on the issue, though?
– lejin
Dec 27 '18 at 16:41
Sorry, I can't recreate your issue.Your code works fine for me. I tried both
pyspark.ml.linalg.DenseVector
and pyspark.mllib.linalg.DenseVector
. Can you add in some code to recreate your example DataFrames?– pault
Dec 27 '18 at 16:46
Sorry, I can't recreate your issue.Your code works fine for me. I tried both
pyspark.ml.linalg.DenseVector
and pyspark.mllib.linalg.DenseVector
. Can you add in some code to recreate your example DataFrames?– pault
Dec 27 '18 at 16:46
@pault I have thought if it has something to do with YARN's memory allocation, since I have observed this sometimes work intermittently. But I haven't been able to replicate the exact circumstances under which it works and it doesn't.
– lejin
Dec 27 '18 at 16:50
@pault I have thought if it has something to do with YARN's memory allocation, since I have observed this sometimes work intermittently. But I haven't been able to replicate the exact circumstances under which it works and it doesn't.
– lejin
Dec 27 '18 at 16:50
|
show 2 more comments
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53948078%2fwhy-is-my-pyspark-dataframe-join-operation-writing-an-empty-result%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53948078%2fwhy-is-my-pyspark-dataframe-join-operation-writing-an-empty-result%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
OYHqGpYj27IeeuviR3zNccumhtaq5JqDwukfGDUJutZ6,7ExvYC 1TVrLIOeo2,lIA1T6KZqc T VDTgIoi5S7
its a udf, its defined in row 3: firstelement=udf(lambda v:float(v[1]),FloatType())
– lejin
Dec 27 '18 at 16:38
Sorry, I missed that. You can also do
firstelement('probability').alias("probability")
and avoid thewithColumnRenamed
call after.– pault
Dec 27 '18 at 16:39
ok, cool, I'll try that to optimize my code. any thoughts on the issue, though?
– lejin
Dec 27 '18 at 16:41
Sorry, I can't recreate your issue.Your code works fine for me. I tried both
pyspark.ml.linalg.DenseVector
andpyspark.mllib.linalg.DenseVector
. Can you add in some code to recreate your example DataFrames?– pault
Dec 27 '18 at 16:46
@pault I have thought if it has something to do with YARN's memory allocation, since I have observed this sometimes work intermittently. But I haven't been able to replicate the exact circumstances under which it works and it doesn't.
– lejin
Dec 27 '18 at 16:50