Camel: How to split and then aggregate when number of item is less than batch size
I have a Camel route that reads a file from S3 and the processes the input file as follows:
- Parse each row into a POJO (Student) using Bindy
- Split the output by body()
- Aggregate by an attribute of the the body (
.semester
) and a batch size of 2
- Invoke the persistence service to upload to DB in given batches
The problem is that with a batch size of 2 and an odd number of records, there is always one record that does not get saved.
Code provided is Kotlin but should not be very different from equivalent Java code (bar the slash in front of "${simple expression}" or the lack of semicolons to terminate statements.
If I set the batch size to 1 then every record is saved, otherwise the last record never gets saved.
I have checked the documentation for message-processor a few times but it doesn't seem to cover this particular scenario.
I have also set [completionTimeout
|completionInterval
] in addition to completionSize
but it does not make any difference.
Has anyone encountered this problem before?
val csvDataFormat = BindyCsvDataFormat(Student::class.java)
from("aws-s3://$student-12-bucket?amazonS3Client=#amazonS3&delay=5000")
.log("A new Student input file has been received in S3: '${header.CamelAwsS3BucketName}/${header.CamelAwsS3Key}'")
.to("direct:move-input-s3-object-to-in-progress")
.to("direct:process-s3-file")
.to("direct:move-input-s3-object-to-completed")
.end()
from("direct:process-s3-file")
.unmarshal(csvDataFormat)
.split(body())
.streaming()
.parallelProcessing()
.aggregate(simple("${body.semester}"), GroupedBodyAggregationStrategy())
.completionSize(2)
.bean(persistenceService)
.end()
With an input CSV file including seven (7) records, this is the output generated (with some added debug logging):
WARN 19540 --- [student-12-move] c.a.s.s.internal.S3AbortableInputStream : Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
INFO 19540 --- [student-12-move] student-workflow-main : A new Student input file has been received in S3: 'student-12-bucket/inbox/foo.csv'
INFO 19540 --- [student-12-move] move-input-s3-object-to-in-progress : Moving S3 file 'inbox/foo.csv' to 'in-progress' folder...
INFO 19540 --- [student-12-move] student-workflow-main : Moved input S3 file 'in-progress/foo.csv' to 'in-progress' folder...
INFO 19540 --- [student-12-move] pre-process-s3-file-records : Start saving to database...
DEBUG 19540 --- [read #7 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=7, name=Student 7, semester=2nd, javaMarks=25)
DEBUG 19540 --- [read #7 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=5, name=Student 5, semester=2nd, javaMarks=81)
DEBUG 19540 --- [read #3 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=6, name=Student 6, semester=1st, javaMarks=15)
DEBUG 19540 --- [read #3 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=2, name=Student 2, semester=1st, javaMarks=62)
DEBUG 19540 --- [read #2 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=3, name=Student 3, semester=2nd, javaMarks=72)
DEBUG 19540 --- [read #2 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=1, name=Student 1, semester=2nd, javaMarks=87)
INFO 19540 --- [student-12-move] device-group-workflow-main : End pre-processing S3 CSV file records...
INFO 19540 --- [student-12-move] move-input-s3-object-to-completed : Moving S3 file 'in-progress/foo.csv' to 'completed' folder...
INFO 19540 --- [student-12-move] device-group-workflow-main : Moved S3 file 'in-progress/foo.csv' to 'completed' folder...
kotlin apache-camel spring-camel
add a comment |
I have a Camel route that reads a file from S3 and the processes the input file as follows:
- Parse each row into a POJO (Student) using Bindy
- Split the output by body()
- Aggregate by an attribute of the the body (
.semester
) and a batch size of 2
- Invoke the persistence service to upload to DB in given batches
The problem is that with a batch size of 2 and an odd number of records, there is always one record that does not get saved.
Code provided is Kotlin but should not be very different from equivalent Java code (bar the slash in front of "${simple expression}" or the lack of semicolons to terminate statements.
If I set the batch size to 1 then every record is saved, otherwise the last record never gets saved.
I have checked the documentation for message-processor a few times but it doesn't seem to cover this particular scenario.
I have also set [completionTimeout
|completionInterval
] in addition to completionSize
but it does not make any difference.
Has anyone encountered this problem before?
val csvDataFormat = BindyCsvDataFormat(Student::class.java)
from("aws-s3://$student-12-bucket?amazonS3Client=#amazonS3&delay=5000")
.log("A new Student input file has been received in S3: '${header.CamelAwsS3BucketName}/${header.CamelAwsS3Key}'")
.to("direct:move-input-s3-object-to-in-progress")
.to("direct:process-s3-file")
.to("direct:move-input-s3-object-to-completed")
.end()
from("direct:process-s3-file")
.unmarshal(csvDataFormat)
.split(body())
.streaming()
.parallelProcessing()
.aggregate(simple("${body.semester}"), GroupedBodyAggregationStrategy())
.completionSize(2)
.bean(persistenceService)
.end()
With an input CSV file including seven (7) records, this is the output generated (with some added debug logging):
WARN 19540 --- [student-12-move] c.a.s.s.internal.S3AbortableInputStream : Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
INFO 19540 --- [student-12-move] student-workflow-main : A new Student input file has been received in S3: 'student-12-bucket/inbox/foo.csv'
INFO 19540 --- [student-12-move] move-input-s3-object-to-in-progress : Moving S3 file 'inbox/foo.csv' to 'in-progress' folder...
INFO 19540 --- [student-12-move] student-workflow-main : Moved input S3 file 'in-progress/foo.csv' to 'in-progress' folder...
INFO 19540 --- [student-12-move] pre-process-s3-file-records : Start saving to database...
DEBUG 19540 --- [read #7 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=7, name=Student 7, semester=2nd, javaMarks=25)
DEBUG 19540 --- [read #7 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=5, name=Student 5, semester=2nd, javaMarks=81)
DEBUG 19540 --- [read #3 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=6, name=Student 6, semester=1st, javaMarks=15)
DEBUG 19540 --- [read #3 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=2, name=Student 2, semester=1st, javaMarks=62)
DEBUG 19540 --- [read #2 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=3, name=Student 3, semester=2nd, javaMarks=72)
DEBUG 19540 --- [read #2 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=1, name=Student 1, semester=2nd, javaMarks=87)
INFO 19540 --- [student-12-move] device-group-workflow-main : End pre-processing S3 CSV file records...
INFO 19540 --- [student-12-move] move-input-s3-object-to-completed : Moving S3 file 'in-progress/foo.csv' to 'completed' folder...
INFO 19540 --- [student-12-move] device-group-workflow-main : Moved S3 file 'in-progress/foo.csv' to 'completed' folder...
kotlin apache-camel spring-camel
The completionTimeout ought to trigger the last row when it times out. It would be strange if that didnt work.
– Claus Ibsen
Jan 3 at 12:27
It does exhibit the right behaviour if I replace simple("${body.semester}") for constant(true). It is probably a bug...
– Lex Luthor
Jan 5 at 1:20
What version of Camel are you using? The group key whether body.semester or constant should not affect the timeout.
– Claus Ibsen
Jan 5 at 7:28
I am using the following components: + camel.version = 2.23.0 + spring-boot.version = 2.1.1.RELEASE + kotlin.version = 1.3.10 + aws-java-sdk.version = 1.11.461
– Lex Luthor
Jan 5 at 23:06
And you are sure there is not something wrong in your bean that doesnt work with 1 record only. Have you tried adding a log after the aggregate to see that it logs something when the completion timeout is triggered etc. And if still a problem, you can try to build a project on github for others to easier take a look at. And make it possible to run easily without an AWS account etc.
– Claus Ibsen
Jan 7 at 20:20
add a comment |
I have a Camel route that reads a file from S3 and the processes the input file as follows:
- Parse each row into a POJO (Student) using Bindy
- Split the output by body()
- Aggregate by an attribute of the the body (
.semester
) and a batch size of 2
- Invoke the persistence service to upload to DB in given batches
The problem is that with a batch size of 2 and an odd number of records, there is always one record that does not get saved.
Code provided is Kotlin but should not be very different from equivalent Java code (bar the slash in front of "${simple expression}" or the lack of semicolons to terminate statements.
If I set the batch size to 1 then every record is saved, otherwise the last record never gets saved.
I have checked the documentation for message-processor a few times but it doesn't seem to cover this particular scenario.
I have also set [completionTimeout
|completionInterval
] in addition to completionSize
but it does not make any difference.
Has anyone encountered this problem before?
val csvDataFormat = BindyCsvDataFormat(Student::class.java)
from("aws-s3://$student-12-bucket?amazonS3Client=#amazonS3&delay=5000")
.log("A new Student input file has been received in S3: '${header.CamelAwsS3BucketName}/${header.CamelAwsS3Key}'")
.to("direct:move-input-s3-object-to-in-progress")
.to("direct:process-s3-file")
.to("direct:move-input-s3-object-to-completed")
.end()
from("direct:process-s3-file")
.unmarshal(csvDataFormat)
.split(body())
.streaming()
.parallelProcessing()
.aggregate(simple("${body.semester}"), GroupedBodyAggregationStrategy())
.completionSize(2)
.bean(persistenceService)
.end()
With an input CSV file including seven (7) records, this is the output generated (with some added debug logging):
WARN 19540 --- [student-12-move] c.a.s.s.internal.S3AbortableInputStream : Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
INFO 19540 --- [student-12-move] student-workflow-main : A new Student input file has been received in S3: 'student-12-bucket/inbox/foo.csv'
INFO 19540 --- [student-12-move] move-input-s3-object-to-in-progress : Moving S3 file 'inbox/foo.csv' to 'in-progress' folder...
INFO 19540 --- [student-12-move] student-workflow-main : Moved input S3 file 'in-progress/foo.csv' to 'in-progress' folder...
INFO 19540 --- [student-12-move] pre-process-s3-file-records : Start saving to database...
DEBUG 19540 --- [read #7 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=7, name=Student 7, semester=2nd, javaMarks=25)
DEBUG 19540 --- [read #7 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=5, name=Student 5, semester=2nd, javaMarks=81)
DEBUG 19540 --- [read #3 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=6, name=Student 6, semester=1st, javaMarks=15)
DEBUG 19540 --- [read #3 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=2, name=Student 2, semester=1st, javaMarks=62)
DEBUG 19540 --- [read #2 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=3, name=Student 3, semester=2nd, javaMarks=72)
DEBUG 19540 --- [read #2 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=1, name=Student 1, semester=2nd, javaMarks=87)
INFO 19540 --- [student-12-move] device-group-workflow-main : End pre-processing S3 CSV file records...
INFO 19540 --- [student-12-move] move-input-s3-object-to-completed : Moving S3 file 'in-progress/foo.csv' to 'completed' folder...
INFO 19540 --- [student-12-move] device-group-workflow-main : Moved S3 file 'in-progress/foo.csv' to 'completed' folder...
kotlin apache-camel spring-camel
I have a Camel route that reads a file from S3 and the processes the input file as follows:
- Parse each row into a POJO (Student) using Bindy
- Split the output by body()
- Aggregate by an attribute of the the body (
.semester
) and a batch size of 2
- Invoke the persistence service to upload to DB in given batches
The problem is that with a batch size of 2 and an odd number of records, there is always one record that does not get saved.
Code provided is Kotlin but should not be very different from equivalent Java code (bar the slash in front of "${simple expression}" or the lack of semicolons to terminate statements.
If I set the batch size to 1 then every record is saved, otherwise the last record never gets saved.
I have checked the documentation for message-processor a few times but it doesn't seem to cover this particular scenario.
I have also set [completionTimeout
|completionInterval
] in addition to completionSize
but it does not make any difference.
Has anyone encountered this problem before?
val csvDataFormat = BindyCsvDataFormat(Student::class.java)
from("aws-s3://$student-12-bucket?amazonS3Client=#amazonS3&delay=5000")
.log("A new Student input file has been received in S3: '${header.CamelAwsS3BucketName}/${header.CamelAwsS3Key}'")
.to("direct:move-input-s3-object-to-in-progress")
.to("direct:process-s3-file")
.to("direct:move-input-s3-object-to-completed")
.end()
from("direct:process-s3-file")
.unmarshal(csvDataFormat)
.split(body())
.streaming()
.parallelProcessing()
.aggregate(simple("${body.semester}"), GroupedBodyAggregationStrategy())
.completionSize(2)
.bean(persistenceService)
.end()
With an input CSV file including seven (7) records, this is the output generated (with some added debug logging):
WARN 19540 --- [student-12-move] c.a.s.s.internal.S3AbortableInputStream : Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
INFO 19540 --- [student-12-move] student-workflow-main : A new Student input file has been received in S3: 'student-12-bucket/inbox/foo.csv'
INFO 19540 --- [student-12-move] move-input-s3-object-to-in-progress : Moving S3 file 'inbox/foo.csv' to 'in-progress' folder...
INFO 19540 --- [student-12-move] student-workflow-main : Moved input S3 file 'in-progress/foo.csv' to 'in-progress' folder...
INFO 19540 --- [student-12-move] pre-process-s3-file-records : Start saving to database...
DEBUG 19540 --- [read #7 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=7, name=Student 7, semester=2nd, javaMarks=25)
DEBUG 19540 --- [read #7 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=5, name=Student 5, semester=2nd, javaMarks=81)
DEBUG 19540 --- [read #3 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=6, name=Student 6, semester=1st, javaMarks=15)
DEBUG 19540 --- [read #3 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=2, name=Student 2, semester=1st, javaMarks=62)
DEBUG 19540 --- [read #2 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=3, name=Student 3, semester=2nd, javaMarks=72)
DEBUG 19540 --- [read #2 - Split] c.b.i.d.s.StudentPersistenceServiceImpl : Saving record to database: Student(id=1, name=Student 1, semester=2nd, javaMarks=87)
INFO 19540 --- [student-12-move] device-group-workflow-main : End pre-processing S3 CSV file records...
INFO 19540 --- [student-12-move] move-input-s3-object-to-completed : Moving S3 file 'in-progress/foo.csv' to 'completed' folder...
INFO 19540 --- [student-12-move] device-group-workflow-main : Moved S3 file 'in-progress/foo.csv' to 'completed' folder...
kotlin apache-camel spring-camel
kotlin apache-camel spring-camel
asked Jan 3 at 7:04
Lex LuthorLex Luthor
769
769
The completionTimeout ought to trigger the last row when it times out. It would be strange if that didnt work.
– Claus Ibsen
Jan 3 at 12:27
It does exhibit the right behaviour if I replace simple("${body.semester}") for constant(true). It is probably a bug...
– Lex Luthor
Jan 5 at 1:20
What version of Camel are you using? The group key whether body.semester or constant should not affect the timeout.
– Claus Ibsen
Jan 5 at 7:28
I am using the following components: + camel.version = 2.23.0 + spring-boot.version = 2.1.1.RELEASE + kotlin.version = 1.3.10 + aws-java-sdk.version = 1.11.461
– Lex Luthor
Jan 5 at 23:06
And you are sure there is not something wrong in your bean that doesnt work with 1 record only. Have you tried adding a log after the aggregate to see that it logs something when the completion timeout is triggered etc. And if still a problem, you can try to build a project on github for others to easier take a look at. And make it possible to run easily without an AWS account etc.
– Claus Ibsen
Jan 7 at 20:20
add a comment |
The completionTimeout ought to trigger the last row when it times out. It would be strange if that didnt work.
– Claus Ibsen
Jan 3 at 12:27
It does exhibit the right behaviour if I replace simple("${body.semester}") for constant(true). It is probably a bug...
– Lex Luthor
Jan 5 at 1:20
What version of Camel are you using? The group key whether body.semester or constant should not affect the timeout.
– Claus Ibsen
Jan 5 at 7:28
I am using the following components: + camel.version = 2.23.0 + spring-boot.version = 2.1.1.RELEASE + kotlin.version = 1.3.10 + aws-java-sdk.version = 1.11.461
– Lex Luthor
Jan 5 at 23:06
And you are sure there is not something wrong in your bean that doesnt work with 1 record only. Have you tried adding a log after the aggregate to see that it logs something when the completion timeout is triggered etc. And if still a problem, you can try to build a project on github for others to easier take a look at. And make it possible to run easily without an AWS account etc.
– Claus Ibsen
Jan 7 at 20:20
The completionTimeout ought to trigger the last row when it times out. It would be strange if that didnt work.
– Claus Ibsen
Jan 3 at 12:27
The completionTimeout ought to trigger the last row when it times out. It would be strange if that didnt work.
– Claus Ibsen
Jan 3 at 12:27
It does exhibit the right behaviour if I replace simple("${body.semester}") for constant(true). It is probably a bug...
– Lex Luthor
Jan 5 at 1:20
It does exhibit the right behaviour if I replace simple("${body.semester}") for constant(true). It is probably a bug...
– Lex Luthor
Jan 5 at 1:20
What version of Camel are you using? The group key whether body.semester or constant should not affect the timeout.
– Claus Ibsen
Jan 5 at 7:28
What version of Camel are you using? The group key whether body.semester or constant should not affect the timeout.
– Claus Ibsen
Jan 5 at 7:28
I am using the following components: + camel.version = 2.23.0 + spring-boot.version = 2.1.1.RELEASE + kotlin.version = 1.3.10 + aws-java-sdk.version = 1.11.461
– Lex Luthor
Jan 5 at 23:06
I am using the following components: + camel.version = 2.23.0 + spring-boot.version = 2.1.1.RELEASE + kotlin.version = 1.3.10 + aws-java-sdk.version = 1.11.461
– Lex Luthor
Jan 5 at 23:06
And you are sure there is not something wrong in your bean that doesnt work with 1 record only. Have you tried adding a log after the aggregate to see that it logs something when the completion timeout is triggered etc. And if still a problem, you can try to build a project on github for others to easier take a look at. And make it possible to run easily without an AWS account etc.
– Claus Ibsen
Jan 7 at 20:20
And you are sure there is not something wrong in your bean that doesnt work with 1 record only. Have you tried adding a log after the aggregate to see that it logs something when the completion timeout is triggered etc. And if still a problem, you can try to build a project on github for others to easier take a look at. And make it possible to run easily without an AWS account etc.
– Claus Ibsen
Jan 7 at 20:20
add a comment |
1 Answer
1
active
oldest
votes
If you need to immediately complete your message, then you can specify a completion predicate which is based on the exchange properties set by the splitter. I've not tried this, but I think
.completionPredicate( simple( "${exchangeProperty.CamelSplitComplete}" ) )
would process the last message.
My other concern is that you've set parallelProcessing
in your splitter, which may mean that the messages aren't processed in order. Is it really the splitter you want the parallel processing applied to, or actually the aggregator? You don't seem to do anything with the split records except aggregate them, then then process them, so it might be better to move the parallelProcessing
instruction to the aggregator.
That did not solve it but it helped me understand the cause of the problem. The issue appears to be that when grouped by a certain attribute of the body there may be some carry over exchanges at the end for which none of the completion conditions will ever be true: * exchange completions size == 2 * exchangeProperty.CamelSplitComplete == true Not sure how to solve it yet..
– Lex Luthor
Jan 3 at 22:52
Maybe turn off parallel processing as you can have out of order processing with it enabled.
– Claus Ibsen
Jan 6 at 9:51
I did turn off parallel processing during split as suggested. end result was the same though..
– Lex Luthor
Jan 6 at 22:09
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54017750%2fcamel-how-to-split-and-then-aggregate-when-number-of-item-is-less-than-batch-si%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
If you need to immediately complete your message, then you can specify a completion predicate which is based on the exchange properties set by the splitter. I've not tried this, but I think
.completionPredicate( simple( "${exchangeProperty.CamelSplitComplete}" ) )
would process the last message.
My other concern is that you've set parallelProcessing
in your splitter, which may mean that the messages aren't processed in order. Is it really the splitter you want the parallel processing applied to, or actually the aggregator? You don't seem to do anything with the split records except aggregate them, then then process them, so it might be better to move the parallelProcessing
instruction to the aggregator.
That did not solve it but it helped me understand the cause of the problem. The issue appears to be that when grouped by a certain attribute of the body there may be some carry over exchanges at the end for which none of the completion conditions will ever be true: * exchange completions size == 2 * exchangeProperty.CamelSplitComplete == true Not sure how to solve it yet..
– Lex Luthor
Jan 3 at 22:52
Maybe turn off parallel processing as you can have out of order processing with it enabled.
– Claus Ibsen
Jan 6 at 9:51
I did turn off parallel processing during split as suggested. end result was the same though..
– Lex Luthor
Jan 6 at 22:09
add a comment |
If you need to immediately complete your message, then you can specify a completion predicate which is based on the exchange properties set by the splitter. I've not tried this, but I think
.completionPredicate( simple( "${exchangeProperty.CamelSplitComplete}" ) )
would process the last message.
My other concern is that you've set parallelProcessing
in your splitter, which may mean that the messages aren't processed in order. Is it really the splitter you want the parallel processing applied to, or actually the aggregator? You don't seem to do anything with the split records except aggregate them, then then process them, so it might be better to move the parallelProcessing
instruction to the aggregator.
That did not solve it but it helped me understand the cause of the problem. The issue appears to be that when grouped by a certain attribute of the body there may be some carry over exchanges at the end for which none of the completion conditions will ever be true: * exchange completions size == 2 * exchangeProperty.CamelSplitComplete == true Not sure how to solve it yet..
– Lex Luthor
Jan 3 at 22:52
Maybe turn off parallel processing as you can have out of order processing with it enabled.
– Claus Ibsen
Jan 6 at 9:51
I did turn off parallel processing during split as suggested. end result was the same though..
– Lex Luthor
Jan 6 at 22:09
add a comment |
If you need to immediately complete your message, then you can specify a completion predicate which is based on the exchange properties set by the splitter. I've not tried this, but I think
.completionPredicate( simple( "${exchangeProperty.CamelSplitComplete}" ) )
would process the last message.
My other concern is that you've set parallelProcessing
in your splitter, which may mean that the messages aren't processed in order. Is it really the splitter you want the parallel processing applied to, or actually the aggregator? You don't seem to do anything with the split records except aggregate them, then then process them, so it might be better to move the parallelProcessing
instruction to the aggregator.
If you need to immediately complete your message, then you can specify a completion predicate which is based on the exchange properties set by the splitter. I've not tried this, but I think
.completionPredicate( simple( "${exchangeProperty.CamelSplitComplete}" ) )
would process the last message.
My other concern is that you've set parallelProcessing
in your splitter, which may mean that the messages aren't processed in order. Is it really the splitter you want the parallel processing applied to, or actually the aggregator? You don't seem to do anything with the split records except aggregate them, then then process them, so it might be better to move the parallelProcessing
instruction to the aggregator.
edited Jan 3 at 9:07
answered Jan 3 at 9:01
ScrewtapeScrewtape
579317
579317
That did not solve it but it helped me understand the cause of the problem. The issue appears to be that when grouped by a certain attribute of the body there may be some carry over exchanges at the end for which none of the completion conditions will ever be true: * exchange completions size == 2 * exchangeProperty.CamelSplitComplete == true Not sure how to solve it yet..
– Lex Luthor
Jan 3 at 22:52
Maybe turn off parallel processing as you can have out of order processing with it enabled.
– Claus Ibsen
Jan 6 at 9:51
I did turn off parallel processing during split as suggested. end result was the same though..
– Lex Luthor
Jan 6 at 22:09
add a comment |
That did not solve it but it helped me understand the cause of the problem. The issue appears to be that when grouped by a certain attribute of the body there may be some carry over exchanges at the end for which none of the completion conditions will ever be true: * exchange completions size == 2 * exchangeProperty.CamelSplitComplete == true Not sure how to solve it yet..
– Lex Luthor
Jan 3 at 22:52
Maybe turn off parallel processing as you can have out of order processing with it enabled.
– Claus Ibsen
Jan 6 at 9:51
I did turn off parallel processing during split as suggested. end result was the same though..
– Lex Luthor
Jan 6 at 22:09
That did not solve it but it helped me understand the cause of the problem. The issue appears to be that when grouped by a certain attribute of the body there may be some carry over exchanges at the end for which none of the completion conditions will ever be true: * exchange completions size == 2 * exchangeProperty.CamelSplitComplete == true Not sure how to solve it yet..
– Lex Luthor
Jan 3 at 22:52
That did not solve it but it helped me understand the cause of the problem. The issue appears to be that when grouped by a certain attribute of the body there may be some carry over exchanges at the end for which none of the completion conditions will ever be true: * exchange completions size == 2 * exchangeProperty.CamelSplitComplete == true Not sure how to solve it yet..
– Lex Luthor
Jan 3 at 22:52
Maybe turn off parallel processing as you can have out of order processing with it enabled.
– Claus Ibsen
Jan 6 at 9:51
Maybe turn off parallel processing as you can have out of order processing with it enabled.
– Claus Ibsen
Jan 6 at 9:51
I did turn off parallel processing during split as suggested. end result was the same though..
– Lex Luthor
Jan 6 at 22:09
I did turn off parallel processing during split as suggested. end result was the same though..
– Lex Luthor
Jan 6 at 22:09
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54017750%2fcamel-how-to-split-and-then-aggregate-when-number-of-item-is-less-than-batch-si%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
The completionTimeout ought to trigger the last row when it times out. It would be strange if that didnt work.
– Claus Ibsen
Jan 3 at 12:27
It does exhibit the right behaviour if I replace simple("${body.semester}") for constant(true). It is probably a bug...
– Lex Luthor
Jan 5 at 1:20
What version of Camel are you using? The group key whether body.semester or constant should not affect the timeout.
– Claus Ibsen
Jan 5 at 7:28
I am using the following components: + camel.version = 2.23.0 + spring-boot.version = 2.1.1.RELEASE + kotlin.version = 1.3.10 + aws-java-sdk.version = 1.11.461
– Lex Luthor
Jan 5 at 23:06
And you are sure there is not something wrong in your bean that doesnt work with 1 record only. Have you tried adding a log after the aggregate to see that it logs something when the completion timeout is triggered etc. And if still a problem, you can try to build a project on github for others to easier take a look at. And make it possible to run easily without an AWS account etc.
– Claus Ibsen
Jan 7 at 20:20