RabbitMQ with Python: Acknowledging and rejecting messages individually while using executemany()












0















I plan on receiving over 5 million messages a day with rabbitmq and I only have control over the consumer side. I wanted to use executemany to store the messages in batches but sometimes a message may be caught in an exception and I want just those individual messages to be rejected and simply acknowledge the rest. How can I selectively reject and acknowledge after an executemany()?



insert_sql = '''MERGE INTO table1
USING (
SELECT :col1 COL1, TO_TIMESTAMP(:col2, 'YYYY-MM-DD HH24:MI:SS,FF9') COL2,
:col3 COL3, :col4 COL4
FROM DUAL
) source_table
ON (table1.COL1 = source_table.COL1 AND table1.COL2 = source_table.COL2
AND table1.COL3 = source_table.COL3 AND table1.COL4 = source_table.COL4)
WHEN NOT MATCHED THEN
INSERT (COL1, COL2, COL3, COL4, COL5, COL6, COL8)
VALUES (:col1, TO_TIMESTAMP(:col2, 'YYYY-MM-DD HH24:MI:SS,FF9'), :col3,
:col4, :col5, xmltype(:col6), xmltype(:col7))'''

def callback(ch, method, properties, body):
# some code to clean up the body, snipping it off
# ...
if decompressed_j == None:
_reject(delivery_tag=method.delivery_tag, requeue=False)
elif isinstance(decompressed_j, dict):
ch.basic_ack(delivery_tag=method.delivery_tag)
global acx_list
acx = ACX_Obj(decompressed_j, raw_jsonArray)
if acx.response.startswith('<someignorabletag>'):
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
else:
acx_list.append(acx.transactionValues)
if len(acx_list) % 20 == 0:
try:
ora_exe(acx_list, ch, method)
except AttributeError as att_e:
print(att_e)
ch.basic_reject(
delivery_tag=method.delivery_tag, requeue=False)
raise att_e
acx_list =
else:
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

def ora_exe(acx_list, ch, method):
conn = cx_Oracle.connect('dummy1/dummyenv', threaded=True)
c = conn.cursor()
try:
c.executemany(insert_sql, acx_list)
except cx_Oracle.IntegrityError as integrity_err:
print('integrity err')
conn.rollback()
except cx_Oracle.DatabaseError as db_err:
print(db_err)
conn.rollback()
else:
conn.commit()
finally:
c.close()
conn.close()









share|improve this question

























  • Could you break your transactions into manageable sizes, then if you hit an exception, run through each message in the single transaction? 5m messages shouldn't be too horrible for that kind of paradigm, though it kind of negates the benefit of transacted sessions anyways

    – C.Nivs
    Dec 31 '18 at 23:43











  • Though manageable here is completely subjective

    – C.Nivs
    Dec 31 '18 at 23:43











  • It could hold as a temporary solution but remember that each message is being appended into a list before it is passed into executemany() which means that if executemany() hits an exception and I start looping through each item in the list to do execute()'s, I cannot mark the older messages which were appended first as rejected.

    – Maple Zhang
    Jan 2 at 15:03
















0















I plan on receiving over 5 million messages a day with rabbitmq and I only have control over the consumer side. I wanted to use executemany to store the messages in batches but sometimes a message may be caught in an exception and I want just those individual messages to be rejected and simply acknowledge the rest. How can I selectively reject and acknowledge after an executemany()?



insert_sql = '''MERGE INTO table1
USING (
SELECT :col1 COL1, TO_TIMESTAMP(:col2, 'YYYY-MM-DD HH24:MI:SS,FF9') COL2,
:col3 COL3, :col4 COL4
FROM DUAL
) source_table
ON (table1.COL1 = source_table.COL1 AND table1.COL2 = source_table.COL2
AND table1.COL3 = source_table.COL3 AND table1.COL4 = source_table.COL4)
WHEN NOT MATCHED THEN
INSERT (COL1, COL2, COL3, COL4, COL5, COL6, COL8)
VALUES (:col1, TO_TIMESTAMP(:col2, 'YYYY-MM-DD HH24:MI:SS,FF9'), :col3,
:col4, :col5, xmltype(:col6), xmltype(:col7))'''

def callback(ch, method, properties, body):
# some code to clean up the body, snipping it off
# ...
if decompressed_j == None:
_reject(delivery_tag=method.delivery_tag, requeue=False)
elif isinstance(decompressed_j, dict):
ch.basic_ack(delivery_tag=method.delivery_tag)
global acx_list
acx = ACX_Obj(decompressed_j, raw_jsonArray)
if acx.response.startswith('<someignorabletag>'):
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
else:
acx_list.append(acx.transactionValues)
if len(acx_list) % 20 == 0:
try:
ora_exe(acx_list, ch, method)
except AttributeError as att_e:
print(att_e)
ch.basic_reject(
delivery_tag=method.delivery_tag, requeue=False)
raise att_e
acx_list =
else:
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

def ora_exe(acx_list, ch, method):
conn = cx_Oracle.connect('dummy1/dummyenv', threaded=True)
c = conn.cursor()
try:
c.executemany(insert_sql, acx_list)
except cx_Oracle.IntegrityError as integrity_err:
print('integrity err')
conn.rollback()
except cx_Oracle.DatabaseError as db_err:
print(db_err)
conn.rollback()
else:
conn.commit()
finally:
c.close()
conn.close()









share|improve this question

























  • Could you break your transactions into manageable sizes, then if you hit an exception, run through each message in the single transaction? 5m messages shouldn't be too horrible for that kind of paradigm, though it kind of negates the benefit of transacted sessions anyways

    – C.Nivs
    Dec 31 '18 at 23:43











  • Though manageable here is completely subjective

    – C.Nivs
    Dec 31 '18 at 23:43











  • It could hold as a temporary solution but remember that each message is being appended into a list before it is passed into executemany() which means that if executemany() hits an exception and I start looping through each item in the list to do execute()'s, I cannot mark the older messages which were appended first as rejected.

    – Maple Zhang
    Jan 2 at 15:03














0












0








0








I plan on receiving over 5 million messages a day with rabbitmq and I only have control over the consumer side. I wanted to use executemany to store the messages in batches but sometimes a message may be caught in an exception and I want just those individual messages to be rejected and simply acknowledge the rest. How can I selectively reject and acknowledge after an executemany()?



insert_sql = '''MERGE INTO table1
USING (
SELECT :col1 COL1, TO_TIMESTAMP(:col2, 'YYYY-MM-DD HH24:MI:SS,FF9') COL2,
:col3 COL3, :col4 COL4
FROM DUAL
) source_table
ON (table1.COL1 = source_table.COL1 AND table1.COL2 = source_table.COL2
AND table1.COL3 = source_table.COL3 AND table1.COL4 = source_table.COL4)
WHEN NOT MATCHED THEN
INSERT (COL1, COL2, COL3, COL4, COL5, COL6, COL8)
VALUES (:col1, TO_TIMESTAMP(:col2, 'YYYY-MM-DD HH24:MI:SS,FF9'), :col3,
:col4, :col5, xmltype(:col6), xmltype(:col7))'''

def callback(ch, method, properties, body):
# some code to clean up the body, snipping it off
# ...
if decompressed_j == None:
_reject(delivery_tag=method.delivery_tag, requeue=False)
elif isinstance(decompressed_j, dict):
ch.basic_ack(delivery_tag=method.delivery_tag)
global acx_list
acx = ACX_Obj(decompressed_j, raw_jsonArray)
if acx.response.startswith('<someignorabletag>'):
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
else:
acx_list.append(acx.transactionValues)
if len(acx_list) % 20 == 0:
try:
ora_exe(acx_list, ch, method)
except AttributeError as att_e:
print(att_e)
ch.basic_reject(
delivery_tag=method.delivery_tag, requeue=False)
raise att_e
acx_list =
else:
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

def ora_exe(acx_list, ch, method):
conn = cx_Oracle.connect('dummy1/dummyenv', threaded=True)
c = conn.cursor()
try:
c.executemany(insert_sql, acx_list)
except cx_Oracle.IntegrityError as integrity_err:
print('integrity err')
conn.rollback()
except cx_Oracle.DatabaseError as db_err:
print(db_err)
conn.rollback()
else:
conn.commit()
finally:
c.close()
conn.close()









share|improve this question
















I plan on receiving over 5 million messages a day with rabbitmq and I only have control over the consumer side. I wanted to use executemany to store the messages in batches but sometimes a message may be caught in an exception and I want just those individual messages to be rejected and simply acknowledge the rest. How can I selectively reject and acknowledge after an executemany()?



insert_sql = '''MERGE INTO table1
USING (
SELECT :col1 COL1, TO_TIMESTAMP(:col2, 'YYYY-MM-DD HH24:MI:SS,FF9') COL2,
:col3 COL3, :col4 COL4
FROM DUAL
) source_table
ON (table1.COL1 = source_table.COL1 AND table1.COL2 = source_table.COL2
AND table1.COL3 = source_table.COL3 AND table1.COL4 = source_table.COL4)
WHEN NOT MATCHED THEN
INSERT (COL1, COL2, COL3, COL4, COL5, COL6, COL8)
VALUES (:col1, TO_TIMESTAMP(:col2, 'YYYY-MM-DD HH24:MI:SS,FF9'), :col3,
:col4, :col5, xmltype(:col6), xmltype(:col7))'''

def callback(ch, method, properties, body):
# some code to clean up the body, snipping it off
# ...
if decompressed_j == None:
_reject(delivery_tag=method.delivery_tag, requeue=False)
elif isinstance(decompressed_j, dict):
ch.basic_ack(delivery_tag=method.delivery_tag)
global acx_list
acx = ACX_Obj(decompressed_j, raw_jsonArray)
if acx.response.startswith('<someignorabletag>'):
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
else:
acx_list.append(acx.transactionValues)
if len(acx_list) % 20 == 0:
try:
ora_exe(acx_list, ch, method)
except AttributeError as att_e:
print(att_e)
ch.basic_reject(
delivery_tag=method.delivery_tag, requeue=False)
raise att_e
acx_list =
else:
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

def ora_exe(acx_list, ch, method):
conn = cx_Oracle.connect('dummy1/dummyenv', threaded=True)
c = conn.cursor()
try:
c.executemany(insert_sql, acx_list)
except cx_Oracle.IntegrityError as integrity_err:
print('integrity err')
conn.rollback()
except cx_Oracle.DatabaseError as db_err:
print(db_err)
conn.rollback()
else:
conn.commit()
finally:
c.close()
conn.close()






python rabbitmq mysql-python






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Jan 1 at 3:32









Bob Jarvis

34.2k55985




34.2k55985










asked Dec 31 '18 at 23:08









Maple ZhangMaple Zhang

1




1













  • Could you break your transactions into manageable sizes, then if you hit an exception, run through each message in the single transaction? 5m messages shouldn't be too horrible for that kind of paradigm, though it kind of negates the benefit of transacted sessions anyways

    – C.Nivs
    Dec 31 '18 at 23:43











  • Though manageable here is completely subjective

    – C.Nivs
    Dec 31 '18 at 23:43











  • It could hold as a temporary solution but remember that each message is being appended into a list before it is passed into executemany() which means that if executemany() hits an exception and I start looping through each item in the list to do execute()'s, I cannot mark the older messages which were appended first as rejected.

    – Maple Zhang
    Jan 2 at 15:03



















  • Could you break your transactions into manageable sizes, then if you hit an exception, run through each message in the single transaction? 5m messages shouldn't be too horrible for that kind of paradigm, though it kind of negates the benefit of transacted sessions anyways

    – C.Nivs
    Dec 31 '18 at 23:43











  • Though manageable here is completely subjective

    – C.Nivs
    Dec 31 '18 at 23:43











  • It could hold as a temporary solution but remember that each message is being appended into a list before it is passed into executemany() which means that if executemany() hits an exception and I start looping through each item in the list to do execute()'s, I cannot mark the older messages which were appended first as rejected.

    – Maple Zhang
    Jan 2 at 15:03

















Could you break your transactions into manageable sizes, then if you hit an exception, run through each message in the single transaction? 5m messages shouldn't be too horrible for that kind of paradigm, though it kind of negates the benefit of transacted sessions anyways

– C.Nivs
Dec 31 '18 at 23:43





Could you break your transactions into manageable sizes, then if you hit an exception, run through each message in the single transaction? 5m messages shouldn't be too horrible for that kind of paradigm, though it kind of negates the benefit of transacted sessions anyways

– C.Nivs
Dec 31 '18 at 23:43













Though manageable here is completely subjective

– C.Nivs
Dec 31 '18 at 23:43





Though manageable here is completely subjective

– C.Nivs
Dec 31 '18 at 23:43













It could hold as a temporary solution but remember that each message is being appended into a list before it is passed into executemany() which means that if executemany() hits an exception and I start looping through each item in the list to do execute()'s, I cannot mark the older messages which were appended first as rejected.

– Maple Zhang
Jan 2 at 15:03





It could hold as a temporary solution but remember that each message is being appended into a list before it is passed into executemany() which means that if executemany() hits an exception and I start looping through each item in the list to do execute()'s, I cannot mark the older messages which were appended first as rejected.

– Maple Zhang
Jan 2 at 15:03












0






active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53992049%2frabbitmq-with-python-acknowledging-and-rejecting-messages-individually-while-us%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes
















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53992049%2frabbitmq-with-python-acknowledging-and-rejecting-messages-individually-while-us%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Monofisismo

Angular Downloading a file using contenturl with Basic Authentication

Olmecas