Why is my PySpark dataframe join operation writing an empty result?

Multi tool use
Multi tool use












0














I have two PySpark dataframes that I'm trying to join into a new dataframe. The join operation seems to show a null dataframe.



I'm using Jupyter notebooks to evaluate the code, on a PySpark kernel, on a cluster with a single master, 4 workers, YARN for resource allocation.



from pyspark.sql.functions import monotonically_increasing_id,udf
from pyspark.sql.types import FloatType
from pyspark.mllib.linalg import DenseVector
firstelement=udf(lambda v:float(v[1]),FloatType())

a = [{'c_id': 'a', 'cv_id': 'b', 'id': 1}, {'c_id': 'c', 'cv_id': 'd', 'id': 2}]
ip = spark.createDataFrame(a)
b = [{'probability': DenseVector([0.99,0.01]), 'id': 1}, {'probability': DenseVector([0.6,0.4]), 'id': 2}]
op = spark.createDataFrame(b)

op.show() #shows the df
#probability, id
#[0.99, 0.01], 1
##probability is a dense vector, id is bigint

ip.show() #shows the df
#c_id, cv_id, id
#a,b,1
##c_id and cv_id are strings, id is bigint

op_final = op.join(ip, ip.id == op.id).select('c_id','cv_id',firstelement('probability')).withColumnRenamed('<lambda>(probability)','probability')

op_final.show() #gives a null df

#but the below seems to work, however, quite slow
ip.collect()
op.collect()
op_final.collect()
op_final.show() #shows the joined df


Perhaps it's my lack of expertise with Spark, but could someone please explain why I'm able to see the first two dataframes, but not the joined dataframe unless I use collect()?










share|improve this question
























  • its a udf, its defined in row 3: firstelement=udf(lambda v:float(v[1]),FloatType())
    – lejin
    Dec 27 '18 at 16:38










  • Sorry, I missed that. You can also do firstelement('probability').alias("probability") and avoid the withColumnRenamed call after.
    – pault
    Dec 27 '18 at 16:39












  • ok, cool, I'll try that to optimize my code. any thoughts on the issue, though?
    – lejin
    Dec 27 '18 at 16:41










  • Sorry, I can't recreate your issue.Your code works fine for me. I tried both pyspark.ml.linalg.DenseVector and pyspark.mllib.linalg.DenseVector. Can you add in some code to recreate your example DataFrames?
    – pault
    Dec 27 '18 at 16:46












  • @pault I have thought if it has something to do with YARN's memory allocation, since I have observed this sometimes work intermittently. But I haven't been able to replicate the exact circumstances under which it works and it doesn't.
    – lejin
    Dec 27 '18 at 16:50
















0














I have two PySpark dataframes that I'm trying to join into a new dataframe. The join operation seems to show a null dataframe.



I'm using Jupyter notebooks to evaluate the code, on a PySpark kernel, on a cluster with a single master, 4 workers, YARN for resource allocation.



from pyspark.sql.functions import monotonically_increasing_id,udf
from pyspark.sql.types import FloatType
from pyspark.mllib.linalg import DenseVector
firstelement=udf(lambda v:float(v[1]),FloatType())

a = [{'c_id': 'a', 'cv_id': 'b', 'id': 1}, {'c_id': 'c', 'cv_id': 'd', 'id': 2}]
ip = spark.createDataFrame(a)
b = [{'probability': DenseVector([0.99,0.01]), 'id': 1}, {'probability': DenseVector([0.6,0.4]), 'id': 2}]
op = spark.createDataFrame(b)

op.show() #shows the df
#probability, id
#[0.99, 0.01], 1
##probability is a dense vector, id is bigint

ip.show() #shows the df
#c_id, cv_id, id
#a,b,1
##c_id and cv_id are strings, id is bigint

op_final = op.join(ip, ip.id == op.id).select('c_id','cv_id',firstelement('probability')).withColumnRenamed('<lambda>(probability)','probability')

op_final.show() #gives a null df

#but the below seems to work, however, quite slow
ip.collect()
op.collect()
op_final.collect()
op_final.show() #shows the joined df


Perhaps it's my lack of expertise with Spark, but could someone please explain why I'm able to see the first two dataframes, but not the joined dataframe unless I use collect()?










share|improve this question
























  • its a udf, its defined in row 3: firstelement=udf(lambda v:float(v[1]),FloatType())
    – lejin
    Dec 27 '18 at 16:38










  • Sorry, I missed that. You can also do firstelement('probability').alias("probability") and avoid the withColumnRenamed call after.
    – pault
    Dec 27 '18 at 16:39












  • ok, cool, I'll try that to optimize my code. any thoughts on the issue, though?
    – lejin
    Dec 27 '18 at 16:41










  • Sorry, I can't recreate your issue.Your code works fine for me. I tried both pyspark.ml.linalg.DenseVector and pyspark.mllib.linalg.DenseVector. Can you add in some code to recreate your example DataFrames?
    – pault
    Dec 27 '18 at 16:46












  • @pault I have thought if it has something to do with YARN's memory allocation, since I have observed this sometimes work intermittently. But I haven't been able to replicate the exact circumstances under which it works and it doesn't.
    – lejin
    Dec 27 '18 at 16:50














0












0








0







I have two PySpark dataframes that I'm trying to join into a new dataframe. The join operation seems to show a null dataframe.



I'm using Jupyter notebooks to evaluate the code, on a PySpark kernel, on a cluster with a single master, 4 workers, YARN for resource allocation.



from pyspark.sql.functions import monotonically_increasing_id,udf
from pyspark.sql.types import FloatType
from pyspark.mllib.linalg import DenseVector
firstelement=udf(lambda v:float(v[1]),FloatType())

a = [{'c_id': 'a', 'cv_id': 'b', 'id': 1}, {'c_id': 'c', 'cv_id': 'd', 'id': 2}]
ip = spark.createDataFrame(a)
b = [{'probability': DenseVector([0.99,0.01]), 'id': 1}, {'probability': DenseVector([0.6,0.4]), 'id': 2}]
op = spark.createDataFrame(b)

op.show() #shows the df
#probability, id
#[0.99, 0.01], 1
##probability is a dense vector, id is bigint

ip.show() #shows the df
#c_id, cv_id, id
#a,b,1
##c_id and cv_id are strings, id is bigint

op_final = op.join(ip, ip.id == op.id).select('c_id','cv_id',firstelement('probability')).withColumnRenamed('<lambda>(probability)','probability')

op_final.show() #gives a null df

#but the below seems to work, however, quite slow
ip.collect()
op.collect()
op_final.collect()
op_final.show() #shows the joined df


Perhaps it's my lack of expertise with Spark, but could someone please explain why I'm able to see the first two dataframes, but not the joined dataframe unless I use collect()?










share|improve this question















I have two PySpark dataframes that I'm trying to join into a new dataframe. The join operation seems to show a null dataframe.



I'm using Jupyter notebooks to evaluate the code, on a PySpark kernel, on a cluster with a single master, 4 workers, YARN for resource allocation.



from pyspark.sql.functions import monotonically_increasing_id,udf
from pyspark.sql.types import FloatType
from pyspark.mllib.linalg import DenseVector
firstelement=udf(lambda v:float(v[1]),FloatType())

a = [{'c_id': 'a', 'cv_id': 'b', 'id': 1}, {'c_id': 'c', 'cv_id': 'd', 'id': 2}]
ip = spark.createDataFrame(a)
b = [{'probability': DenseVector([0.99,0.01]), 'id': 1}, {'probability': DenseVector([0.6,0.4]), 'id': 2}]
op = spark.createDataFrame(b)

op.show() #shows the df
#probability, id
#[0.99, 0.01], 1
##probability is a dense vector, id is bigint

ip.show() #shows the df
#c_id, cv_id, id
#a,b,1
##c_id and cv_id are strings, id is bigint

op_final = op.join(ip, ip.id == op.id).select('c_id','cv_id',firstelement('probability')).withColumnRenamed('<lambda>(probability)','probability')

op_final.show() #gives a null df

#but the below seems to work, however, quite slow
ip.collect()
op.collect()
op_final.collect()
op_final.show() #shows the joined df


Perhaps it's my lack of expertise with Spark, but could someone please explain why I'm able to see the first two dataframes, but not the joined dataframe unless I use collect()?







python apache-spark pyspark






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Dec 27 '18 at 17:04

























asked Dec 27 '18 at 16:31









lejin

133




133












  • its a udf, its defined in row 3: firstelement=udf(lambda v:float(v[1]),FloatType())
    – lejin
    Dec 27 '18 at 16:38










  • Sorry, I missed that. You can also do firstelement('probability').alias("probability") and avoid the withColumnRenamed call after.
    – pault
    Dec 27 '18 at 16:39












  • ok, cool, I'll try that to optimize my code. any thoughts on the issue, though?
    – lejin
    Dec 27 '18 at 16:41










  • Sorry, I can't recreate your issue.Your code works fine for me. I tried both pyspark.ml.linalg.DenseVector and pyspark.mllib.linalg.DenseVector. Can you add in some code to recreate your example DataFrames?
    – pault
    Dec 27 '18 at 16:46












  • @pault I have thought if it has something to do with YARN's memory allocation, since I have observed this sometimes work intermittently. But I haven't been able to replicate the exact circumstances under which it works and it doesn't.
    – lejin
    Dec 27 '18 at 16:50


















  • its a udf, its defined in row 3: firstelement=udf(lambda v:float(v[1]),FloatType())
    – lejin
    Dec 27 '18 at 16:38










  • Sorry, I missed that. You can also do firstelement('probability').alias("probability") and avoid the withColumnRenamed call after.
    – pault
    Dec 27 '18 at 16:39












  • ok, cool, I'll try that to optimize my code. any thoughts on the issue, though?
    – lejin
    Dec 27 '18 at 16:41










  • Sorry, I can't recreate your issue.Your code works fine for me. I tried both pyspark.ml.linalg.DenseVector and pyspark.mllib.linalg.DenseVector. Can you add in some code to recreate your example DataFrames?
    – pault
    Dec 27 '18 at 16:46












  • @pault I have thought if it has something to do with YARN's memory allocation, since I have observed this sometimes work intermittently. But I haven't been able to replicate the exact circumstances under which it works and it doesn't.
    – lejin
    Dec 27 '18 at 16:50
















its a udf, its defined in row 3: firstelement=udf(lambda v:float(v[1]),FloatType())
– lejin
Dec 27 '18 at 16:38




its a udf, its defined in row 3: firstelement=udf(lambda v:float(v[1]),FloatType())
– lejin
Dec 27 '18 at 16:38












Sorry, I missed that. You can also do firstelement('probability').alias("probability") and avoid the withColumnRenamed call after.
– pault
Dec 27 '18 at 16:39






Sorry, I missed that. You can also do firstelement('probability').alias("probability") and avoid the withColumnRenamed call after.
– pault
Dec 27 '18 at 16:39














ok, cool, I'll try that to optimize my code. any thoughts on the issue, though?
– lejin
Dec 27 '18 at 16:41




ok, cool, I'll try that to optimize my code. any thoughts on the issue, though?
– lejin
Dec 27 '18 at 16:41












Sorry, I can't recreate your issue.Your code works fine for me. I tried both pyspark.ml.linalg.DenseVector and pyspark.mllib.linalg.DenseVector. Can you add in some code to recreate your example DataFrames?
– pault
Dec 27 '18 at 16:46






Sorry, I can't recreate your issue.Your code works fine for me. I tried both pyspark.ml.linalg.DenseVector and pyspark.mllib.linalg.DenseVector. Can you add in some code to recreate your example DataFrames?
– pault
Dec 27 '18 at 16:46














@pault I have thought if it has something to do with YARN's memory allocation, since I have observed this sometimes work intermittently. But I haven't been able to replicate the exact circumstances under which it works and it doesn't.
– lejin
Dec 27 '18 at 16:50




@pault I have thought if it has something to do with YARN's memory allocation, since I have observed this sometimes work intermittently. But I haven't been able to replicate the exact circumstances under which it works and it doesn't.
– lejin
Dec 27 '18 at 16:50

















active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53948078%2fwhy-is-my-pyspark-dataframe-join-operation-writing-an-empty-result%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown






























active

oldest

votes













active

oldest

votes









active

oldest

votes






active

oldest

votes
















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.





Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


Please pay close attention to the following guidance:


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53948078%2fwhy-is-my-pyspark-dataframe-join-operation-writing-an-empty-result%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







OYHqGpYj27IeeuviR3zNccumhtaq5JqDwukfGDUJutZ6,7ExvYC 1TVrLIOeo2,lIA1T6KZqc T VDTgIoi5S7
beNQoy58Ko

Popular posts from this blog

Monofisismo

Angular Downloading a file using contenturl with Basic Authentication

Olmecas