Celery tasks not being completely removed from queue





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}







0















I have a celery task that calls itself (with do_stuff.apply_async(queue="foo")). Previously I have ran app.control.add_consumer("foo", reply=True) so my workers can consume from this queue.



After some amount of time, I want to stop all the tasks from that queue and all running tasks that were launched from do_stuff.



So I run this code:



app.control.cancel_consumer("foo", reply=True)

i = app.control.inspect()
for queue in [i.active, i.scheduled, i.reserved]:
for worker_name, worker_tasks in queue().items():
for task in worker_tasks:
args = ast.literal_eval(task["args"])
if "do_stuff" in task["name"] and args[0] == crawler.name:
app.control.revoke(task["id"], terminate=True)


This "works" kind of. It does stop all the running tasks from do_stuff and it does clear the scheduled tasks (or at least I can't see any in Flower, after running this code).



The problem is that if I run app.control.add_consumer("foo", reply=True) again, without running anything else, new tasks start running. That means that celery/redis, somehow, manages to keep tasks somewhere.



Why is that happening? Where are those "hidden" tasks saved? And how can I remove them?










share|improve this question































    0















    I have a celery task that calls itself (with do_stuff.apply_async(queue="foo")). Previously I have ran app.control.add_consumer("foo", reply=True) so my workers can consume from this queue.



    After some amount of time, I want to stop all the tasks from that queue and all running tasks that were launched from do_stuff.



    So I run this code:



    app.control.cancel_consumer("foo", reply=True)

    i = app.control.inspect()
    for queue in [i.active, i.scheduled, i.reserved]:
    for worker_name, worker_tasks in queue().items():
    for task in worker_tasks:
    args = ast.literal_eval(task["args"])
    if "do_stuff" in task["name"] and args[0] == crawler.name:
    app.control.revoke(task["id"], terminate=True)


    This "works" kind of. It does stop all the running tasks from do_stuff and it does clear the scheduled tasks (or at least I can't see any in Flower, after running this code).



    The problem is that if I run app.control.add_consumer("foo", reply=True) again, without running anything else, new tasks start running. That means that celery/redis, somehow, manages to keep tasks somewhere.



    Why is that happening? Where are those "hidden" tasks saved? And how can I remove them?










    share|improve this question



























      0












      0








      0








      I have a celery task that calls itself (with do_stuff.apply_async(queue="foo")). Previously I have ran app.control.add_consumer("foo", reply=True) so my workers can consume from this queue.



      After some amount of time, I want to stop all the tasks from that queue and all running tasks that were launched from do_stuff.



      So I run this code:



      app.control.cancel_consumer("foo", reply=True)

      i = app.control.inspect()
      for queue in [i.active, i.scheduled, i.reserved]:
      for worker_name, worker_tasks in queue().items():
      for task in worker_tasks:
      args = ast.literal_eval(task["args"])
      if "do_stuff" in task["name"] and args[0] == crawler.name:
      app.control.revoke(task["id"], terminate=True)


      This "works" kind of. It does stop all the running tasks from do_stuff and it does clear the scheduled tasks (or at least I can't see any in Flower, after running this code).



      The problem is that if I run app.control.add_consumer("foo", reply=True) again, without running anything else, new tasks start running. That means that celery/redis, somehow, manages to keep tasks somewhere.



      Why is that happening? Where are those "hidden" tasks saved? And how can I remove them?










      share|improve this question
















      I have a celery task that calls itself (with do_stuff.apply_async(queue="foo")). Previously I have ran app.control.add_consumer("foo", reply=True) so my workers can consume from this queue.



      After some amount of time, I want to stop all the tasks from that queue and all running tasks that were launched from do_stuff.



      So I run this code:



      app.control.cancel_consumer("foo", reply=True)

      i = app.control.inspect()
      for queue in [i.active, i.scheduled, i.reserved]:
      for worker_name, worker_tasks in queue().items():
      for task in worker_tasks:
      args = ast.literal_eval(task["args"])
      if "do_stuff" in task["name"] and args[0] == crawler.name:
      app.control.revoke(task["id"], terminate=True)


      This "works" kind of. It does stop all the running tasks from do_stuff and it does clear the scheduled tasks (or at least I can't see any in Flower, after running this code).



      The problem is that if I run app.control.add_consumer("foo", reply=True) again, without running anything else, new tasks start running. That means that celery/redis, somehow, manages to keep tasks somewhere.



      Why is that happening? Where are those "hidden" tasks saved? And how can I remove them?







      django python-3.x celery






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Jan 4 at 14:55







      alexandernst

















      asked Jan 4 at 14:44









      alexandernstalexandernst

      4,9571451124




      4,9571451124
























          1 Answer
          1






          active

          oldest

          votes


















          1














          Answering to my own question: this happens because while I'm making the workers not to consume from the queue (by calling cancel_consumer), the queue itself still contains everything.



          I have found a way to (programatically) flush the queue:



          from celery.bin.celery import CeleryCommand
          cmd = CeleryCommand()
          super(CeleryCommand, cmd).execute_from_commandline([
          '',
          'purge',
          '-f',
          '-Q', queue_name,
          '-A', 'main'
          ])





          share|improve this answer


























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54041147%2fcelery-tasks-not-being-completely-removed-from-queue%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            1














            Answering to my own question: this happens because while I'm making the workers not to consume from the queue (by calling cancel_consumer), the queue itself still contains everything.



            I have found a way to (programatically) flush the queue:



            from celery.bin.celery import CeleryCommand
            cmd = CeleryCommand()
            super(CeleryCommand, cmd).execute_from_commandline([
            '',
            'purge',
            '-f',
            '-Q', queue_name,
            '-A', 'main'
            ])





            share|improve this answer






























              1














              Answering to my own question: this happens because while I'm making the workers not to consume from the queue (by calling cancel_consumer), the queue itself still contains everything.



              I have found a way to (programatically) flush the queue:



              from celery.bin.celery import CeleryCommand
              cmd = CeleryCommand()
              super(CeleryCommand, cmd).execute_from_commandline([
              '',
              'purge',
              '-f',
              '-Q', queue_name,
              '-A', 'main'
              ])





              share|improve this answer




























                1












                1








                1







                Answering to my own question: this happens because while I'm making the workers not to consume from the queue (by calling cancel_consumer), the queue itself still contains everything.



                I have found a way to (programatically) flush the queue:



                from celery.bin.celery import CeleryCommand
                cmd = CeleryCommand()
                super(CeleryCommand, cmd).execute_from_commandline([
                '',
                'purge',
                '-f',
                '-Q', queue_name,
                '-A', 'main'
                ])





                share|improve this answer















                Answering to my own question: this happens because while I'm making the workers not to consume from the queue (by calling cancel_consumer), the queue itself still contains everything.



                I have found a way to (programatically) flush the queue:



                from celery.bin.celery import CeleryCommand
                cmd = CeleryCommand()
                super(CeleryCommand, cmd).execute_from_commandline([
                '',
                'purge',
                '-f',
                '-Q', queue_name,
                '-A', 'main'
                ])






                share|improve this answer














                share|improve this answer



                share|improve this answer








                edited Jan 4 at 16:08

























                answered Jan 4 at 15:43









                alexandernstalexandernst

                4,9571451124




                4,9571451124
































                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54041147%2fcelery-tasks-not-being-completely-removed-from-queue%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Monofisismo

                    Angular Downloading a file using contenturl with Basic Authentication

                    Olmecas