pika for rabbitMQ crashing while using flask server
So we have a single thread flask server running where we receive requests from a python app client. In this flask server we use rabbitMQ with pika library to distribute messages to other clients.
What is happening is that in the get function the program is crashing with the error:
pika.exceptions.ConnectionClosed: (505, 'UNEXPECTED_FRAME - expected
content header for class 60, got non content header frame instead')
I've searched a lot of topics about this in stack overflow and others but they all address problems with multi threading which is not the case. Flask should only serve with one thread unless it is called in app.run(threaded=yes).
The program normally crashes when multiple messages are sent in a short interval (e.g. 5 per second) and it's also important to note that messages are being received every second with a request to this function:
@app.route('/api/users/getMessages', methods=['POST'])
def get_Messages():
data = json.loads(request.data)
token = data['token']
payload = jwt.decode(token, 'SECRET', algorithms=['HS256'])
istid = payload['istid']
print('istid: '+istid)
messages =
queue = channel.queue_declare(queue=istid)
for i in range(queue.method.message_count):
method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True)
if method_frame:
#print(method_frame, header_frame, body)
messages.append(body)
else:
print('No message returned')
res = {'messages':messages, 'error':0}
return jsonify(res)
In this code it crashes normally in the line:
queue = channel.queue_declare(queue=istid)
But we also tried to change the code to use a while instead of a for where it ends when the body is None and it crashes in the line:method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True)
in that case.
Also important, the crashes are random and it can work a few times and then randomly crashes after a get request while messages are being sent. If anyone knows anything related to this we would appreciate any help.
Another note, we thought about using basic_consume with callback instead of basic_get but we didn't find a way in which this would work since we have to send the messages back and have several user making requests to this same function.
EDIT #1:
In the rabbitMQ docs rabbitmq if you search for the function "def basic_get" you will notice there are some TODO comments and also a reference to this
Due to implementation details, this cannot be called a second time
until the callback is executed.
So I suspected that this could be what was happening but even if it is I don't know how could it be solved.
python python-3.x flask rabbitmq pika
|
show 1 more comment
So we have a single thread flask server running where we receive requests from a python app client. In this flask server we use rabbitMQ with pika library to distribute messages to other clients.
What is happening is that in the get function the program is crashing with the error:
pika.exceptions.ConnectionClosed: (505, 'UNEXPECTED_FRAME - expected
content header for class 60, got non content header frame instead')
I've searched a lot of topics about this in stack overflow and others but they all address problems with multi threading which is not the case. Flask should only serve with one thread unless it is called in app.run(threaded=yes).
The program normally crashes when multiple messages are sent in a short interval (e.g. 5 per second) and it's also important to note that messages are being received every second with a request to this function:
@app.route('/api/users/getMessages', methods=['POST'])
def get_Messages():
data = json.loads(request.data)
token = data['token']
payload = jwt.decode(token, 'SECRET', algorithms=['HS256'])
istid = payload['istid']
print('istid: '+istid)
messages =
queue = channel.queue_declare(queue=istid)
for i in range(queue.method.message_count):
method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True)
if method_frame:
#print(method_frame, header_frame, body)
messages.append(body)
else:
print('No message returned')
res = {'messages':messages, 'error':0}
return jsonify(res)
In this code it crashes normally in the line:
queue = channel.queue_declare(queue=istid)
But we also tried to change the code to use a while instead of a for where it ends when the body is None and it crashes in the line:method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True)
in that case.
Also important, the crashes are random and it can work a few times and then randomly crashes after a get request while messages are being sent. If anyone knows anything related to this we would appreciate any help.
Another note, we thought about using basic_consume with callback instead of basic_get but we didn't find a way in which this would work since we have to send the messages back and have several user making requests to this same function.
EDIT #1:
In the rabbitMQ docs rabbitmq if you search for the function "def basic_get" you will notice there are some TODO comments and also a reference to this
Due to implementation details, this cannot be called a second time
until the callback is executed.
So I suspected that this could be what was happening but even if it is I don't know how could it be solved.
python python-3.x flask rabbitmq pika
Where are you creating your connection and channel objects?
– Luke Bakken
Dec 31 '18 at 15:46
1
Your current code isn't thread-safe. You'll need to check pika for a thread-safe example, or create a connection/channel within get_Messages.
– eandersson
Dec 31 '18 at 19:31
@LukeBakken I'm creating the connection at the beginning outside the functions as well as the channel ( in other words only one time when we run the app). We tried opening and closing channels anytime that we need to get /send a message, it still crashed. We then tried to also open /close connection every time we had to get/send a message. It worked but we think it is too resource intensive and a bad workaround.
– Miguel
Jan 1 at 0:54
@eandersson What do you mean? We are not using threads. As far as my knowledge goes the code runs in only one thread as we are not using threading nor telling flask to run on multiple threads. Am I making any wrong assumption?
– Miguel
Jan 1 at 0:56
1
"We then tried to also open /close connection every time we had to get/send a message. It worked..." - this is proof that multiple threads are accessing your connections and channels in your current code that has an issue. If you turn on Pika's debug output I think you may see the thread ID in the log message, or you could add that information with a custom log formatter.
– Luke Bakken
Jan 1 at 13:28
|
show 1 more comment
So we have a single thread flask server running where we receive requests from a python app client. In this flask server we use rabbitMQ with pika library to distribute messages to other clients.
What is happening is that in the get function the program is crashing with the error:
pika.exceptions.ConnectionClosed: (505, 'UNEXPECTED_FRAME - expected
content header for class 60, got non content header frame instead')
I've searched a lot of topics about this in stack overflow and others but they all address problems with multi threading which is not the case. Flask should only serve with one thread unless it is called in app.run(threaded=yes).
The program normally crashes when multiple messages are sent in a short interval (e.g. 5 per second) and it's also important to note that messages are being received every second with a request to this function:
@app.route('/api/users/getMessages', methods=['POST'])
def get_Messages():
data = json.loads(request.data)
token = data['token']
payload = jwt.decode(token, 'SECRET', algorithms=['HS256'])
istid = payload['istid']
print('istid: '+istid)
messages =
queue = channel.queue_declare(queue=istid)
for i in range(queue.method.message_count):
method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True)
if method_frame:
#print(method_frame, header_frame, body)
messages.append(body)
else:
print('No message returned')
res = {'messages':messages, 'error':0}
return jsonify(res)
In this code it crashes normally in the line:
queue = channel.queue_declare(queue=istid)
But we also tried to change the code to use a while instead of a for where it ends when the body is None and it crashes in the line:method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True)
in that case.
Also important, the crashes are random and it can work a few times and then randomly crashes after a get request while messages are being sent. If anyone knows anything related to this we would appreciate any help.
Another note, we thought about using basic_consume with callback instead of basic_get but we didn't find a way in which this would work since we have to send the messages back and have several user making requests to this same function.
EDIT #1:
In the rabbitMQ docs rabbitmq if you search for the function "def basic_get" you will notice there are some TODO comments and also a reference to this
Due to implementation details, this cannot be called a second time
until the callback is executed.
So I suspected that this could be what was happening but even if it is I don't know how could it be solved.
python python-3.x flask rabbitmq pika
So we have a single thread flask server running where we receive requests from a python app client. In this flask server we use rabbitMQ with pika library to distribute messages to other clients.
What is happening is that in the get function the program is crashing with the error:
pika.exceptions.ConnectionClosed: (505, 'UNEXPECTED_FRAME - expected
content header for class 60, got non content header frame instead')
I've searched a lot of topics about this in stack overflow and others but they all address problems with multi threading which is not the case. Flask should only serve with one thread unless it is called in app.run(threaded=yes).
The program normally crashes when multiple messages are sent in a short interval (e.g. 5 per second) and it's also important to note that messages are being received every second with a request to this function:
@app.route('/api/users/getMessages', methods=['POST'])
def get_Messages():
data = json.loads(request.data)
token = data['token']
payload = jwt.decode(token, 'SECRET', algorithms=['HS256'])
istid = payload['istid']
print('istid: '+istid)
messages =
queue = channel.queue_declare(queue=istid)
for i in range(queue.method.message_count):
method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True)
if method_frame:
#print(method_frame, header_frame, body)
messages.append(body)
else:
print('No message returned')
res = {'messages':messages, 'error':0}
return jsonify(res)
In this code it crashes normally in the line:
queue = channel.queue_declare(queue=istid)
But we also tried to change the code to use a while instead of a for where it ends when the body is None and it crashes in the line:method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True)
in that case.
Also important, the crashes are random and it can work a few times and then randomly crashes after a get request while messages are being sent. If anyone knows anything related to this we would appreciate any help.
Another note, we thought about using basic_consume with callback instead of basic_get but we didn't find a way in which this would work since we have to send the messages back and have several user making requests to this same function.
EDIT #1:
In the rabbitMQ docs rabbitmq if you search for the function "def basic_get" you will notice there are some TODO comments and also a reference to this
Due to implementation details, this cannot be called a second time
until the callback is executed.
So I suspected that this could be what was happening but even if it is I don't know how could it be solved.
python python-3.x flask rabbitmq pika
python python-3.x flask rabbitmq pika
edited Jan 1 at 1:07
Miguel
asked Dec 30 '18 at 13:58
MiguelMiguel
1278
1278
Where are you creating your connection and channel objects?
– Luke Bakken
Dec 31 '18 at 15:46
1
Your current code isn't thread-safe. You'll need to check pika for a thread-safe example, or create a connection/channel within get_Messages.
– eandersson
Dec 31 '18 at 19:31
@LukeBakken I'm creating the connection at the beginning outside the functions as well as the channel ( in other words only one time when we run the app). We tried opening and closing channels anytime that we need to get /send a message, it still crashed. We then tried to also open /close connection every time we had to get/send a message. It worked but we think it is too resource intensive and a bad workaround.
– Miguel
Jan 1 at 0:54
@eandersson What do you mean? We are not using threads. As far as my knowledge goes the code runs in only one thread as we are not using threading nor telling flask to run on multiple threads. Am I making any wrong assumption?
– Miguel
Jan 1 at 0:56
1
"We then tried to also open /close connection every time we had to get/send a message. It worked..." - this is proof that multiple threads are accessing your connections and channels in your current code that has an issue. If you turn on Pika's debug output I think you may see the thread ID in the log message, or you could add that information with a custom log formatter.
– Luke Bakken
Jan 1 at 13:28
|
show 1 more comment
Where are you creating your connection and channel objects?
– Luke Bakken
Dec 31 '18 at 15:46
1
Your current code isn't thread-safe. You'll need to check pika for a thread-safe example, or create a connection/channel within get_Messages.
– eandersson
Dec 31 '18 at 19:31
@LukeBakken I'm creating the connection at the beginning outside the functions as well as the channel ( in other words only one time when we run the app). We tried opening and closing channels anytime that we need to get /send a message, it still crashed. We then tried to also open /close connection every time we had to get/send a message. It worked but we think it is too resource intensive and a bad workaround.
– Miguel
Jan 1 at 0:54
@eandersson What do you mean? We are not using threads. As far as my knowledge goes the code runs in only one thread as we are not using threading nor telling flask to run on multiple threads. Am I making any wrong assumption?
– Miguel
Jan 1 at 0:56
1
"We then tried to also open /close connection every time we had to get/send a message. It worked..." - this is proof that multiple threads are accessing your connections and channels in your current code that has an issue. If you turn on Pika's debug output I think you may see the thread ID in the log message, or you could add that information with a custom log formatter.
– Luke Bakken
Jan 1 at 13:28
Where are you creating your connection and channel objects?
– Luke Bakken
Dec 31 '18 at 15:46
Where are you creating your connection and channel objects?
– Luke Bakken
Dec 31 '18 at 15:46
1
1
Your current code isn't thread-safe. You'll need to check pika for a thread-safe example, or create a connection/channel within get_Messages.
– eandersson
Dec 31 '18 at 19:31
Your current code isn't thread-safe. You'll need to check pika for a thread-safe example, or create a connection/channel within get_Messages.
– eandersson
Dec 31 '18 at 19:31
@LukeBakken I'm creating the connection at the beginning outside the functions as well as the channel ( in other words only one time when we run the app). We tried opening and closing channels anytime that we need to get /send a message, it still crashed. We then tried to also open /close connection every time we had to get/send a message. It worked but we think it is too resource intensive and a bad workaround.
– Miguel
Jan 1 at 0:54
@LukeBakken I'm creating the connection at the beginning outside the functions as well as the channel ( in other words only one time when we run the app). We tried opening and closing channels anytime that we need to get /send a message, it still crashed. We then tried to also open /close connection every time we had to get/send a message. It worked but we think it is too resource intensive and a bad workaround.
– Miguel
Jan 1 at 0:54
@eandersson What do you mean? We are not using threads. As far as my knowledge goes the code runs in only one thread as we are not using threading nor telling flask to run on multiple threads. Am I making any wrong assumption?
– Miguel
Jan 1 at 0:56
@eandersson What do you mean? We are not using threads. As far as my knowledge goes the code runs in only one thread as we are not using threading nor telling flask to run on multiple threads. Am I making any wrong assumption?
– Miguel
Jan 1 at 0:56
1
1
"We then tried to also open /close connection every time we had to get/send a message. It worked..." - this is proof that multiple threads are accessing your connections and channels in your current code that has an issue. If you turn on Pika's debug output I think you may see the thread ID in the log message, or you could add that information with a custom log formatter.
– Luke Bakken
Jan 1 at 13:28
"We then tried to also open /close connection every time we had to get/send a message. It worked..." - this is proof that multiple threads are accessing your connections and channels in your current code that has an issue. If you turn on Pika's debug output I think you may see the thread ID in the log message, or you could add that information with a custom log formatter.
– Luke Bakken
Jan 1 at 13:28
|
show 1 more comment
1 Answer
1
active
oldest
votes
For anyone interested in the solution, as it is in the other comments, the program was not thread safe since flask as of version 1.0 uses threaded = True as default.
The solution is either:
1) running flask with app.run(threaded = False)
2) Making the program thread safe by implementing locks whenever accessing the channel /connection with pika.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53978213%2fpika-for-rabbitmq-crashing-while-using-flask-server%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
For anyone interested in the solution, as it is in the other comments, the program was not thread safe since flask as of version 1.0 uses threaded = True as default.
The solution is either:
1) running flask with app.run(threaded = False)
2) Making the program thread safe by implementing locks whenever accessing the channel /connection with pika.
add a comment |
For anyone interested in the solution, as it is in the other comments, the program was not thread safe since flask as of version 1.0 uses threaded = True as default.
The solution is either:
1) running flask with app.run(threaded = False)
2) Making the program thread safe by implementing locks whenever accessing the channel /connection with pika.
add a comment |
For anyone interested in the solution, as it is in the other comments, the program was not thread safe since flask as of version 1.0 uses threaded = True as default.
The solution is either:
1) running flask with app.run(threaded = False)
2) Making the program thread safe by implementing locks whenever accessing the channel /connection with pika.
For anyone interested in the solution, as it is in the other comments, the program was not thread safe since flask as of version 1.0 uses threaded = True as default.
The solution is either:
1) running flask with app.run(threaded = False)
2) Making the program thread safe by implementing locks whenever accessing the channel /connection with pika.
answered Jan 1 at 16:15
MiguelMiguel
1278
1278
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53978213%2fpika-for-rabbitmq-crashing-while-using-flask-server%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Where are you creating your connection and channel objects?
– Luke Bakken
Dec 31 '18 at 15:46
1
Your current code isn't thread-safe. You'll need to check pika for a thread-safe example, or create a connection/channel within get_Messages.
– eandersson
Dec 31 '18 at 19:31
@LukeBakken I'm creating the connection at the beginning outside the functions as well as the channel ( in other words only one time when we run the app). We tried opening and closing channels anytime that we need to get /send a message, it still crashed. We then tried to also open /close connection every time we had to get/send a message. It worked but we think it is too resource intensive and a bad workaround.
– Miguel
Jan 1 at 0:54
@eandersson What do you mean? We are not using threads. As far as my knowledge goes the code runs in only one thread as we are not using threading nor telling flask to run on multiple threads. Am I making any wrong assumption?
– Miguel
Jan 1 at 0:56
1
"We then tried to also open /close connection every time we had to get/send a message. It worked..." - this is proof that multiple threads are accessing your connections and channels in your current code that has an issue. If you turn on Pika's debug output I think you may see the thread ID in the log message, or you could add that information with a custom log formatter.
– Luke Bakken
Jan 1 at 13:28