RabbitMQ with Python: Acknowledging and rejecting messages individually while using executemany()
I plan on receiving over 5 million messages a day with rabbitmq and I only have control over the consumer side. I wanted to use executemany to store the messages in batches but sometimes a message may be caught in an exception and I want just those individual messages to be rejected and simply acknowledge the rest. How can I selectively reject and acknowledge after an executemany()?
insert_sql = '''MERGE INTO table1
USING (
SELECT :col1 COL1, TO_TIMESTAMP(:col2, 'YYYY-MM-DD HH24:MI:SS,FF9') COL2,
:col3 COL3, :col4 COL4
FROM DUAL
) source_table
ON (table1.COL1 = source_table.COL1 AND table1.COL2 = source_table.COL2
AND table1.COL3 = source_table.COL3 AND table1.COL4 = source_table.COL4)
WHEN NOT MATCHED THEN
INSERT (COL1, COL2, COL3, COL4, COL5, COL6, COL8)
VALUES (:col1, TO_TIMESTAMP(:col2, 'YYYY-MM-DD HH24:MI:SS,FF9'), :col3,
:col4, :col5, xmltype(:col6), xmltype(:col7))'''
def callback(ch, method, properties, body):
# some code to clean up the body, snipping it off
# ...
if decompressed_j == None:
_reject(delivery_tag=method.delivery_tag, requeue=False)
elif isinstance(decompressed_j, dict):
ch.basic_ack(delivery_tag=method.delivery_tag)
global acx_list
acx = ACX_Obj(decompressed_j, raw_jsonArray)
if acx.response.startswith('<someignorabletag>'):
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
else:
acx_list.append(acx.transactionValues)
if len(acx_list) % 20 == 0:
try:
ora_exe(acx_list, ch, method)
except AttributeError as att_e:
print(att_e)
ch.basic_reject(
delivery_tag=method.delivery_tag, requeue=False)
raise att_e
acx_list =
else:
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
def ora_exe(acx_list, ch, method):
conn = cx_Oracle.connect('dummy1/dummyenv', threaded=True)
c = conn.cursor()
try:
c.executemany(insert_sql, acx_list)
except cx_Oracle.IntegrityError as integrity_err:
print('integrity err')
conn.rollback()
except cx_Oracle.DatabaseError as db_err:
print(db_err)
conn.rollback()
else:
conn.commit()
finally:
c.close()
conn.close()
python rabbitmq mysql-python
add a comment |
I plan on receiving over 5 million messages a day with rabbitmq and I only have control over the consumer side. I wanted to use executemany to store the messages in batches but sometimes a message may be caught in an exception and I want just those individual messages to be rejected and simply acknowledge the rest. How can I selectively reject and acknowledge after an executemany()?
insert_sql = '''MERGE INTO table1
USING (
SELECT :col1 COL1, TO_TIMESTAMP(:col2, 'YYYY-MM-DD HH24:MI:SS,FF9') COL2,
:col3 COL3, :col4 COL4
FROM DUAL
) source_table
ON (table1.COL1 = source_table.COL1 AND table1.COL2 = source_table.COL2
AND table1.COL3 = source_table.COL3 AND table1.COL4 = source_table.COL4)
WHEN NOT MATCHED THEN
INSERT (COL1, COL2, COL3, COL4, COL5, COL6, COL8)
VALUES (:col1, TO_TIMESTAMP(:col2, 'YYYY-MM-DD HH24:MI:SS,FF9'), :col3,
:col4, :col5, xmltype(:col6), xmltype(:col7))'''
def callback(ch, method, properties, body):
# some code to clean up the body, snipping it off
# ...
if decompressed_j == None:
_reject(delivery_tag=method.delivery_tag, requeue=False)
elif isinstance(decompressed_j, dict):
ch.basic_ack(delivery_tag=method.delivery_tag)
global acx_list
acx = ACX_Obj(decompressed_j, raw_jsonArray)
if acx.response.startswith('<someignorabletag>'):
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
else:
acx_list.append(acx.transactionValues)
if len(acx_list) % 20 == 0:
try:
ora_exe(acx_list, ch, method)
except AttributeError as att_e:
print(att_e)
ch.basic_reject(
delivery_tag=method.delivery_tag, requeue=False)
raise att_e
acx_list =
else:
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
def ora_exe(acx_list, ch, method):
conn = cx_Oracle.connect('dummy1/dummyenv', threaded=True)
c = conn.cursor()
try:
c.executemany(insert_sql, acx_list)
except cx_Oracle.IntegrityError as integrity_err:
print('integrity err')
conn.rollback()
except cx_Oracle.DatabaseError as db_err:
print(db_err)
conn.rollback()
else:
conn.commit()
finally:
c.close()
conn.close()
python rabbitmq mysql-python
Could you break your transactions into manageable sizes, then if you hit an exception, run through each message in the single transaction? 5m messages shouldn't be too horrible for that kind of paradigm, though it kind of negates the benefit of transacted sessions anyways
– C.Nivs
Dec 31 '18 at 23:43
Thoughmanageable
here is completely subjective
– C.Nivs
Dec 31 '18 at 23:43
It could hold as a temporary solution but remember that each message is being appended into a list before it is passed into executemany() which means that if executemany() hits an exception and I start looping through each item in the list to do execute()'s, I cannot mark the older messages which were appended first as rejected.
– Maple Zhang
Jan 2 at 15:03
add a comment |
I plan on receiving over 5 million messages a day with rabbitmq and I only have control over the consumer side. I wanted to use executemany to store the messages in batches but sometimes a message may be caught in an exception and I want just those individual messages to be rejected and simply acknowledge the rest. How can I selectively reject and acknowledge after an executemany()?
insert_sql = '''MERGE INTO table1
USING (
SELECT :col1 COL1, TO_TIMESTAMP(:col2, 'YYYY-MM-DD HH24:MI:SS,FF9') COL2,
:col3 COL3, :col4 COL4
FROM DUAL
) source_table
ON (table1.COL1 = source_table.COL1 AND table1.COL2 = source_table.COL2
AND table1.COL3 = source_table.COL3 AND table1.COL4 = source_table.COL4)
WHEN NOT MATCHED THEN
INSERT (COL1, COL2, COL3, COL4, COL5, COL6, COL8)
VALUES (:col1, TO_TIMESTAMP(:col2, 'YYYY-MM-DD HH24:MI:SS,FF9'), :col3,
:col4, :col5, xmltype(:col6), xmltype(:col7))'''
def callback(ch, method, properties, body):
# some code to clean up the body, snipping it off
# ...
if decompressed_j == None:
_reject(delivery_tag=method.delivery_tag, requeue=False)
elif isinstance(decompressed_j, dict):
ch.basic_ack(delivery_tag=method.delivery_tag)
global acx_list
acx = ACX_Obj(decompressed_j, raw_jsonArray)
if acx.response.startswith('<someignorabletag>'):
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
else:
acx_list.append(acx.transactionValues)
if len(acx_list) % 20 == 0:
try:
ora_exe(acx_list, ch, method)
except AttributeError as att_e:
print(att_e)
ch.basic_reject(
delivery_tag=method.delivery_tag, requeue=False)
raise att_e
acx_list =
else:
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
def ora_exe(acx_list, ch, method):
conn = cx_Oracle.connect('dummy1/dummyenv', threaded=True)
c = conn.cursor()
try:
c.executemany(insert_sql, acx_list)
except cx_Oracle.IntegrityError as integrity_err:
print('integrity err')
conn.rollback()
except cx_Oracle.DatabaseError as db_err:
print(db_err)
conn.rollback()
else:
conn.commit()
finally:
c.close()
conn.close()
python rabbitmq mysql-python
I plan on receiving over 5 million messages a day with rabbitmq and I only have control over the consumer side. I wanted to use executemany to store the messages in batches but sometimes a message may be caught in an exception and I want just those individual messages to be rejected and simply acknowledge the rest. How can I selectively reject and acknowledge after an executemany()?
insert_sql = '''MERGE INTO table1
USING (
SELECT :col1 COL1, TO_TIMESTAMP(:col2, 'YYYY-MM-DD HH24:MI:SS,FF9') COL2,
:col3 COL3, :col4 COL4
FROM DUAL
) source_table
ON (table1.COL1 = source_table.COL1 AND table1.COL2 = source_table.COL2
AND table1.COL3 = source_table.COL3 AND table1.COL4 = source_table.COL4)
WHEN NOT MATCHED THEN
INSERT (COL1, COL2, COL3, COL4, COL5, COL6, COL8)
VALUES (:col1, TO_TIMESTAMP(:col2, 'YYYY-MM-DD HH24:MI:SS,FF9'), :col3,
:col4, :col5, xmltype(:col6), xmltype(:col7))'''
def callback(ch, method, properties, body):
# some code to clean up the body, snipping it off
# ...
if decompressed_j == None:
_reject(delivery_tag=method.delivery_tag, requeue=False)
elif isinstance(decompressed_j, dict):
ch.basic_ack(delivery_tag=method.delivery_tag)
global acx_list
acx = ACX_Obj(decompressed_j, raw_jsonArray)
if acx.response.startswith('<someignorabletag>'):
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
else:
acx_list.append(acx.transactionValues)
if len(acx_list) % 20 == 0:
try:
ora_exe(acx_list, ch, method)
except AttributeError as att_e:
print(att_e)
ch.basic_reject(
delivery_tag=method.delivery_tag, requeue=False)
raise att_e
acx_list =
else:
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
def ora_exe(acx_list, ch, method):
conn = cx_Oracle.connect('dummy1/dummyenv', threaded=True)
c = conn.cursor()
try:
c.executemany(insert_sql, acx_list)
except cx_Oracle.IntegrityError as integrity_err:
print('integrity err')
conn.rollback()
except cx_Oracle.DatabaseError as db_err:
print(db_err)
conn.rollback()
else:
conn.commit()
finally:
c.close()
conn.close()
python rabbitmq mysql-python
python rabbitmq mysql-python
edited Jan 1 at 3:32
Bob Jarvis
34.2k55985
34.2k55985
asked Dec 31 '18 at 23:08
Maple ZhangMaple Zhang
1
1
Could you break your transactions into manageable sizes, then if you hit an exception, run through each message in the single transaction? 5m messages shouldn't be too horrible for that kind of paradigm, though it kind of negates the benefit of transacted sessions anyways
– C.Nivs
Dec 31 '18 at 23:43
Thoughmanageable
here is completely subjective
– C.Nivs
Dec 31 '18 at 23:43
It could hold as a temporary solution but remember that each message is being appended into a list before it is passed into executemany() which means that if executemany() hits an exception and I start looping through each item in the list to do execute()'s, I cannot mark the older messages which were appended first as rejected.
– Maple Zhang
Jan 2 at 15:03
add a comment |
Could you break your transactions into manageable sizes, then if you hit an exception, run through each message in the single transaction? 5m messages shouldn't be too horrible for that kind of paradigm, though it kind of negates the benefit of transacted sessions anyways
– C.Nivs
Dec 31 '18 at 23:43
Thoughmanageable
here is completely subjective
– C.Nivs
Dec 31 '18 at 23:43
It could hold as a temporary solution but remember that each message is being appended into a list before it is passed into executemany() which means that if executemany() hits an exception and I start looping through each item in the list to do execute()'s, I cannot mark the older messages which were appended first as rejected.
– Maple Zhang
Jan 2 at 15:03
Could you break your transactions into manageable sizes, then if you hit an exception, run through each message in the single transaction? 5m messages shouldn't be too horrible for that kind of paradigm, though it kind of negates the benefit of transacted sessions anyways
– C.Nivs
Dec 31 '18 at 23:43
Could you break your transactions into manageable sizes, then if you hit an exception, run through each message in the single transaction? 5m messages shouldn't be too horrible for that kind of paradigm, though it kind of negates the benefit of transacted sessions anyways
– C.Nivs
Dec 31 '18 at 23:43
Though
manageable
here is completely subjective– C.Nivs
Dec 31 '18 at 23:43
Though
manageable
here is completely subjective– C.Nivs
Dec 31 '18 at 23:43
It could hold as a temporary solution but remember that each message is being appended into a list before it is passed into executemany() which means that if executemany() hits an exception and I start looping through each item in the list to do execute()'s, I cannot mark the older messages which were appended first as rejected.
– Maple Zhang
Jan 2 at 15:03
It could hold as a temporary solution but remember that each message is being appended into a list before it is passed into executemany() which means that if executemany() hits an exception and I start looping through each item in the list to do execute()'s, I cannot mark the older messages which were appended first as rejected.
– Maple Zhang
Jan 2 at 15:03
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53992049%2frabbitmq-with-python-acknowledging-and-rejecting-messages-individually-while-us%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53992049%2frabbitmq-with-python-acknowledging-and-rejecting-messages-individually-while-us%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Could you break your transactions into manageable sizes, then if you hit an exception, run through each message in the single transaction? 5m messages shouldn't be too horrible for that kind of paradigm, though it kind of negates the benefit of transacted sessions anyways
– C.Nivs
Dec 31 '18 at 23:43
Though
manageable
here is completely subjective– C.Nivs
Dec 31 '18 at 23:43
It could hold as a temporary solution but remember that each message is being appended into a list before it is passed into executemany() which means that if executemany() hits an exception and I start looping through each item in the list to do execute()'s, I cannot mark the older messages which were appended first as rejected.
– Maple Zhang
Jan 2 at 15:03