pika for rabbitMQ crashing while using flask server












0















So we have a single thread flask server running where we receive requests from a python app client. In this flask server we use rabbitMQ with pika library to distribute messages to other clients.
What is happening is that in the get function the program is crashing with the error:




pika.exceptions.ConnectionClosed: (505, 'UNEXPECTED_FRAME - expected
content header for class 60, got non content header frame instead')




I've searched a lot of topics about this in stack overflow and others but they all address problems with multi threading which is not the case. Flask should only serve with one thread unless it is called in app.run(threaded=yes).



The program normally crashes when multiple messages are sent in a short interval (e.g. 5 per second) and it's also important to note that messages are being received every second with a request to this function:



@app.route('/api/users/getMessages', methods=['POST'])  
def get_Messages():
data = json.loads(request.data)
token = data['token']

payload = jwt.decode(token, 'SECRET', algorithms=['HS256'])
istid = payload['istid']
print('istid: '+istid)

messages =

queue = channel.queue_declare(queue=istid)
for i in range(queue.method.message_count):
method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True)
if method_frame:
#print(method_frame, header_frame, body)
messages.append(body)
else:
print('No message returned')

res = {'messages':messages, 'error':0}
return jsonify(res)


In this code it crashes normally in the line:



queue = channel.queue_declare(queue=istid)


But we also tried to change the code to use a while instead of a for where it ends when the body is None and it crashes in the line:
method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True)
in that case.
Also important, the crashes are random and it can work a few times and then randomly crashes after a get request while messages are being sent. If anyone knows anything related to this we would appreciate any help.



Another note, we thought about using basic_consume with callback instead of basic_get but we didn't find a way in which this would work since we have to send the messages back and have several user making requests to this same function.



EDIT #1:
In the rabbitMQ docs rabbitmq if you search for the function "def basic_get" you will notice there are some TODO comments and also a reference to this




Due to implementation details, this cannot be called a second time
until the callback is executed.




So I suspected that this could be what was happening but even if it is I don't know how could it be solved.










share|improve this question

























  • Where are you creating your connection and channel objects?

    – Luke Bakken
    Dec 31 '18 at 15:46






  • 1





    Your current code isn't thread-safe. You'll need to check pika for a thread-safe example, or create a connection/channel within get_Messages.

    – eandersson
    Dec 31 '18 at 19:31











  • @LukeBakken I'm creating the connection at the beginning outside the functions as well as the channel ( in other words only one time when we run the app). We tried opening and closing channels anytime that we need to get /send a message, it still crashed. We then tried to also open /close connection every time we had to get/send a message. It worked but we think it is too resource intensive and a bad workaround.

    – Miguel
    Jan 1 at 0:54













  • @eandersson What do you mean? We are not using threads. As far as my knowledge goes the code runs in only one thread as we are not using threading nor telling flask to run on multiple threads. Am I making any wrong assumption?

    – Miguel
    Jan 1 at 0:56






  • 1





    "We then tried to also open /close connection every time we had to get/send a message. It worked..." - this is proof that multiple threads are accessing your connections and channels in your current code that has an issue. If you turn on Pika's debug output I think you may see the thread ID in the log message, or you could add that information with a custom log formatter.

    – Luke Bakken
    Jan 1 at 13:28


















0















So we have a single thread flask server running where we receive requests from a python app client. In this flask server we use rabbitMQ with pika library to distribute messages to other clients.
What is happening is that in the get function the program is crashing with the error:




pika.exceptions.ConnectionClosed: (505, 'UNEXPECTED_FRAME - expected
content header for class 60, got non content header frame instead')




I've searched a lot of topics about this in stack overflow and others but they all address problems with multi threading which is not the case. Flask should only serve with one thread unless it is called in app.run(threaded=yes).



The program normally crashes when multiple messages are sent in a short interval (e.g. 5 per second) and it's also important to note that messages are being received every second with a request to this function:



@app.route('/api/users/getMessages', methods=['POST'])  
def get_Messages():
data = json.loads(request.data)
token = data['token']

payload = jwt.decode(token, 'SECRET', algorithms=['HS256'])
istid = payload['istid']
print('istid: '+istid)

messages =

queue = channel.queue_declare(queue=istid)
for i in range(queue.method.message_count):
method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True)
if method_frame:
#print(method_frame, header_frame, body)
messages.append(body)
else:
print('No message returned')

res = {'messages':messages, 'error':0}
return jsonify(res)


In this code it crashes normally in the line:



queue = channel.queue_declare(queue=istid)


But we also tried to change the code to use a while instead of a for where it ends when the body is None and it crashes in the line:
method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True)
in that case.
Also important, the crashes are random and it can work a few times and then randomly crashes after a get request while messages are being sent. If anyone knows anything related to this we would appreciate any help.



Another note, we thought about using basic_consume with callback instead of basic_get but we didn't find a way in which this would work since we have to send the messages back and have several user making requests to this same function.



EDIT #1:
In the rabbitMQ docs rabbitmq if you search for the function "def basic_get" you will notice there are some TODO comments and also a reference to this




Due to implementation details, this cannot be called a second time
until the callback is executed.




So I suspected that this could be what was happening but even if it is I don't know how could it be solved.










share|improve this question

























  • Where are you creating your connection and channel objects?

    – Luke Bakken
    Dec 31 '18 at 15:46






  • 1





    Your current code isn't thread-safe. You'll need to check pika for a thread-safe example, or create a connection/channel within get_Messages.

    – eandersson
    Dec 31 '18 at 19:31











  • @LukeBakken I'm creating the connection at the beginning outside the functions as well as the channel ( in other words only one time when we run the app). We tried opening and closing channels anytime that we need to get /send a message, it still crashed. We then tried to also open /close connection every time we had to get/send a message. It worked but we think it is too resource intensive and a bad workaround.

    – Miguel
    Jan 1 at 0:54













  • @eandersson What do you mean? We are not using threads. As far as my knowledge goes the code runs in only one thread as we are not using threading nor telling flask to run on multiple threads. Am I making any wrong assumption?

    – Miguel
    Jan 1 at 0:56






  • 1





    "We then tried to also open /close connection every time we had to get/send a message. It worked..." - this is proof that multiple threads are accessing your connections and channels in your current code that has an issue. If you turn on Pika's debug output I think you may see the thread ID in the log message, or you could add that information with a custom log formatter.

    – Luke Bakken
    Jan 1 at 13:28
















0












0








0








So we have a single thread flask server running where we receive requests from a python app client. In this flask server we use rabbitMQ with pika library to distribute messages to other clients.
What is happening is that in the get function the program is crashing with the error:




pika.exceptions.ConnectionClosed: (505, 'UNEXPECTED_FRAME - expected
content header for class 60, got non content header frame instead')




I've searched a lot of topics about this in stack overflow and others but they all address problems with multi threading which is not the case. Flask should only serve with one thread unless it is called in app.run(threaded=yes).



The program normally crashes when multiple messages are sent in a short interval (e.g. 5 per second) and it's also important to note that messages are being received every second with a request to this function:



@app.route('/api/users/getMessages', methods=['POST'])  
def get_Messages():
data = json.loads(request.data)
token = data['token']

payload = jwt.decode(token, 'SECRET', algorithms=['HS256'])
istid = payload['istid']
print('istid: '+istid)

messages =

queue = channel.queue_declare(queue=istid)
for i in range(queue.method.message_count):
method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True)
if method_frame:
#print(method_frame, header_frame, body)
messages.append(body)
else:
print('No message returned')

res = {'messages':messages, 'error':0}
return jsonify(res)


In this code it crashes normally in the line:



queue = channel.queue_declare(queue=istid)


But we also tried to change the code to use a while instead of a for where it ends when the body is None and it crashes in the line:
method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True)
in that case.
Also important, the crashes are random and it can work a few times and then randomly crashes after a get request while messages are being sent. If anyone knows anything related to this we would appreciate any help.



Another note, we thought about using basic_consume with callback instead of basic_get but we didn't find a way in which this would work since we have to send the messages back and have several user making requests to this same function.



EDIT #1:
In the rabbitMQ docs rabbitmq if you search for the function "def basic_get" you will notice there are some TODO comments and also a reference to this




Due to implementation details, this cannot be called a second time
until the callback is executed.




So I suspected that this could be what was happening but even if it is I don't know how could it be solved.










share|improve this question
















So we have a single thread flask server running where we receive requests from a python app client. In this flask server we use rabbitMQ with pika library to distribute messages to other clients.
What is happening is that in the get function the program is crashing with the error:




pika.exceptions.ConnectionClosed: (505, 'UNEXPECTED_FRAME - expected
content header for class 60, got non content header frame instead')




I've searched a lot of topics about this in stack overflow and others but they all address problems with multi threading which is not the case. Flask should only serve with one thread unless it is called in app.run(threaded=yes).



The program normally crashes when multiple messages are sent in a short interval (e.g. 5 per second) and it's also important to note that messages are being received every second with a request to this function:



@app.route('/api/users/getMessages', methods=['POST'])  
def get_Messages():
data = json.loads(request.data)
token = data['token']

payload = jwt.decode(token, 'SECRET', algorithms=['HS256'])
istid = payload['istid']
print('istid: '+istid)

messages =

queue = channel.queue_declare(queue=istid)
for i in range(queue.method.message_count):
method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True)
if method_frame:
#print(method_frame, header_frame, body)
messages.append(body)
else:
print('No message returned')

res = {'messages':messages, 'error':0}
return jsonify(res)


In this code it crashes normally in the line:



queue = channel.queue_declare(queue=istid)


But we also tried to change the code to use a while instead of a for where it ends when the body is None and it crashes in the line:
method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True)
in that case.
Also important, the crashes are random and it can work a few times and then randomly crashes after a get request while messages are being sent. If anyone knows anything related to this we would appreciate any help.



Another note, we thought about using basic_consume with callback instead of basic_get but we didn't find a way in which this would work since we have to send the messages back and have several user making requests to this same function.



EDIT #1:
In the rabbitMQ docs rabbitmq if you search for the function "def basic_get" you will notice there are some TODO comments and also a reference to this




Due to implementation details, this cannot be called a second time
until the callback is executed.




So I suspected that this could be what was happening but even if it is I don't know how could it be solved.







python python-3.x flask rabbitmq pika






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Jan 1 at 1:07







Miguel

















asked Dec 30 '18 at 13:58









MiguelMiguel

1278




1278













  • Where are you creating your connection and channel objects?

    – Luke Bakken
    Dec 31 '18 at 15:46






  • 1





    Your current code isn't thread-safe. You'll need to check pika for a thread-safe example, or create a connection/channel within get_Messages.

    – eandersson
    Dec 31 '18 at 19:31











  • @LukeBakken I'm creating the connection at the beginning outside the functions as well as the channel ( in other words only one time when we run the app). We tried opening and closing channels anytime that we need to get /send a message, it still crashed. We then tried to also open /close connection every time we had to get/send a message. It worked but we think it is too resource intensive and a bad workaround.

    – Miguel
    Jan 1 at 0:54













  • @eandersson What do you mean? We are not using threads. As far as my knowledge goes the code runs in only one thread as we are not using threading nor telling flask to run on multiple threads. Am I making any wrong assumption?

    – Miguel
    Jan 1 at 0:56






  • 1





    "We then tried to also open /close connection every time we had to get/send a message. It worked..." - this is proof that multiple threads are accessing your connections and channels in your current code that has an issue. If you turn on Pika's debug output I think you may see the thread ID in the log message, or you could add that information with a custom log formatter.

    – Luke Bakken
    Jan 1 at 13:28





















  • Where are you creating your connection and channel objects?

    – Luke Bakken
    Dec 31 '18 at 15:46






  • 1





    Your current code isn't thread-safe. You'll need to check pika for a thread-safe example, or create a connection/channel within get_Messages.

    – eandersson
    Dec 31 '18 at 19:31











  • @LukeBakken I'm creating the connection at the beginning outside the functions as well as the channel ( in other words only one time when we run the app). We tried opening and closing channels anytime that we need to get /send a message, it still crashed. We then tried to also open /close connection every time we had to get/send a message. It worked but we think it is too resource intensive and a bad workaround.

    – Miguel
    Jan 1 at 0:54













  • @eandersson What do you mean? We are not using threads. As far as my knowledge goes the code runs in only one thread as we are not using threading nor telling flask to run on multiple threads. Am I making any wrong assumption?

    – Miguel
    Jan 1 at 0:56






  • 1





    "We then tried to also open /close connection every time we had to get/send a message. It worked..." - this is proof that multiple threads are accessing your connections and channels in your current code that has an issue. If you turn on Pika's debug output I think you may see the thread ID in the log message, or you could add that information with a custom log formatter.

    – Luke Bakken
    Jan 1 at 13:28



















Where are you creating your connection and channel objects?

– Luke Bakken
Dec 31 '18 at 15:46





Where are you creating your connection and channel objects?

– Luke Bakken
Dec 31 '18 at 15:46




1




1





Your current code isn't thread-safe. You'll need to check pika for a thread-safe example, or create a connection/channel within get_Messages.

– eandersson
Dec 31 '18 at 19:31





Your current code isn't thread-safe. You'll need to check pika for a thread-safe example, or create a connection/channel within get_Messages.

– eandersson
Dec 31 '18 at 19:31













@LukeBakken I'm creating the connection at the beginning outside the functions as well as the channel ( in other words only one time when we run the app). We tried opening and closing channels anytime that we need to get /send a message, it still crashed. We then tried to also open /close connection every time we had to get/send a message. It worked but we think it is too resource intensive and a bad workaround.

– Miguel
Jan 1 at 0:54







@LukeBakken I'm creating the connection at the beginning outside the functions as well as the channel ( in other words only one time when we run the app). We tried opening and closing channels anytime that we need to get /send a message, it still crashed. We then tried to also open /close connection every time we had to get/send a message. It worked but we think it is too resource intensive and a bad workaround.

– Miguel
Jan 1 at 0:54















@eandersson What do you mean? We are not using threads. As far as my knowledge goes the code runs in only one thread as we are not using threading nor telling flask to run on multiple threads. Am I making any wrong assumption?

– Miguel
Jan 1 at 0:56





@eandersson What do you mean? We are not using threads. As far as my knowledge goes the code runs in only one thread as we are not using threading nor telling flask to run on multiple threads. Am I making any wrong assumption?

– Miguel
Jan 1 at 0:56




1




1





"We then tried to also open /close connection every time we had to get/send a message. It worked..." - this is proof that multiple threads are accessing your connections and channels in your current code that has an issue. If you turn on Pika's debug output I think you may see the thread ID in the log message, or you could add that information with a custom log formatter.

– Luke Bakken
Jan 1 at 13:28







"We then tried to also open /close connection every time we had to get/send a message. It worked..." - this is proof that multiple threads are accessing your connections and channels in your current code that has an issue. If you turn on Pika's debug output I think you may see the thread ID in the log message, or you could add that information with a custom log formatter.

– Luke Bakken
Jan 1 at 13:28














1 Answer
1






active

oldest

votes


















0














For anyone interested in the solution, as it is in the other comments, the program was not thread safe since flask as of version 1.0 uses threaded = True as default.

The solution is either:

1) running flask with app.run(threaded = False)

2) Making the program thread safe by implementing locks whenever accessing the channel /connection with pika.






share|improve this answer























    Your Answer






    StackExchange.ifUsing("editor", function () {
    StackExchange.using("externalEditor", function () {
    StackExchange.using("snippets", function () {
    StackExchange.snippets.init();
    });
    });
    }, "code-snippets");

    StackExchange.ready(function() {
    var channelOptions = {
    tags: "".split(" "),
    id: "1"
    };
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function() {
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled) {
    StackExchange.using("snippets", function() {
    createEditor();
    });
    }
    else {
    createEditor();
    }
    });

    function createEditor() {
    StackExchange.prepareEditor({
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader: {
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    },
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    });


    }
    });














    draft saved

    draft discarded


















    StackExchange.ready(
    function () {
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53978213%2fpika-for-rabbitmq-crashing-while-using-flask-server%23new-answer', 'question_page');
    }
    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    0














    For anyone interested in the solution, as it is in the other comments, the program was not thread safe since flask as of version 1.0 uses threaded = True as default.

    The solution is either:

    1) running flask with app.run(threaded = False)

    2) Making the program thread safe by implementing locks whenever accessing the channel /connection with pika.






    share|improve this answer




























      0














      For anyone interested in the solution, as it is in the other comments, the program was not thread safe since flask as of version 1.0 uses threaded = True as default.

      The solution is either:

      1) running flask with app.run(threaded = False)

      2) Making the program thread safe by implementing locks whenever accessing the channel /connection with pika.






      share|improve this answer


























        0












        0








        0







        For anyone interested in the solution, as it is in the other comments, the program was not thread safe since flask as of version 1.0 uses threaded = True as default.

        The solution is either:

        1) running flask with app.run(threaded = False)

        2) Making the program thread safe by implementing locks whenever accessing the channel /connection with pika.






        share|improve this answer













        For anyone interested in the solution, as it is in the other comments, the program was not thread safe since flask as of version 1.0 uses threaded = True as default.

        The solution is either:

        1) running flask with app.run(threaded = False)

        2) Making the program thread safe by implementing locks whenever accessing the channel /connection with pika.







        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered Jan 1 at 16:15









        MiguelMiguel

        1278




        1278






























            draft saved

            draft discarded




















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53978213%2fpika-for-rabbitmq-crashing-while-using-flask-server%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Monofisismo

            Angular Downloading a file using contenturl with Basic Authentication

            Olmecas