How to specify retention period of join window?
I want to join two streams and I have set the join window to 25 hours as the records to be joined can be a maximum of 24 hours apart.
final Long JOIN_WINDOW = TimeUnit.HOURS.toMillis(25);
kstream.join(
runsheetIdStream,
(jt,r) -> { jt.setDate(r.getStart_date()); return jt; },
JoinWindows.of(JOIN_WINDOW),
Joined.with(Serdes.Long(),jobTransactionSerde,runsheetSerde))
This is throwing the following exception:
org.apache.kafka.streams.errors.TopologyException: Invalid topology: The retention period of the join window KSTREAM-JOINTHIS-0000000016-store must be no smaller than its window size.
How do I increase the retention period?
apache-kafka apache-kafka-streams
add a comment |
I want to join two streams and I have set the join window to 25 hours as the records to be joined can be a maximum of 24 hours apart.
final Long JOIN_WINDOW = TimeUnit.HOURS.toMillis(25);
kstream.join(
runsheetIdStream,
(jt,r) -> { jt.setDate(r.getStart_date()); return jt; },
JoinWindows.of(JOIN_WINDOW),
Joined.with(Serdes.Long(),jobTransactionSerde,runsheetSerde))
This is throwing the following exception:
org.apache.kafka.streams.errors.TopologyException: Invalid topology: The retention period of the join window KSTREAM-JOINTHIS-0000000016-store must be no smaller than its window size.
How do I increase the retention period?
apache-kafka apache-kafka-streams
add a comment |
I want to join two streams and I have set the join window to 25 hours as the records to be joined can be a maximum of 24 hours apart.
final Long JOIN_WINDOW = TimeUnit.HOURS.toMillis(25);
kstream.join(
runsheetIdStream,
(jt,r) -> { jt.setDate(r.getStart_date()); return jt; },
JoinWindows.of(JOIN_WINDOW),
Joined.with(Serdes.Long(),jobTransactionSerde,runsheetSerde))
This is throwing the following exception:
org.apache.kafka.streams.errors.TopologyException: Invalid topology: The retention period of the join window KSTREAM-JOINTHIS-0000000016-store must be no smaller than its window size.
How do I increase the retention period?
apache-kafka apache-kafka-streams
I want to join two streams and I have set the join window to 25 hours as the records to be joined can be a maximum of 24 hours apart.
final Long JOIN_WINDOW = TimeUnit.HOURS.toMillis(25);
kstream.join(
runsheetIdStream,
(jt,r) -> { jt.setDate(r.getStart_date()); return jt; },
JoinWindows.of(JOIN_WINDOW),
Joined.with(Serdes.Long(),jobTransactionSerde,runsheetSerde))
This is throwing the following exception:
org.apache.kafka.streams.errors.TopologyException: Invalid topology: The retention period of the join window KSTREAM-JOINTHIS-0000000016-store must be no smaller than its window size.
How do I increase the retention period?
apache-kafka apache-kafka-streams
apache-kafka apache-kafka-streams
edited Dec 31 '18 at 18:09
Jacek Laskowski
44.6k18132267
44.6k18132267
asked Dec 31 '18 at 14:06
Ankur ranaAnkur rana
8610
8610
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
When you join
and used JoinWindows.of(JOIN_WINDOW)
you implicitly defined the metadata of the underlying state store.
From the javadoc of JoinWindows.of:
Specifies that records of the same key are joinable if their timestamps are within timeDifference, i.e., the timestamp of a record from the secondary stream is max timeDifference earlier or later than the timestamp of the record from the primary stream.
The so-called retention period (aka window maintain duration) was earlier (before Kafka Streams 2.1.0) specified using until:
Set the window maintain duration (retention time) in milliseconds. This retention time is a guaranteed lower bound for how long a window will be maintained.
Since by default the retention is 1 day (can't find the reference at the moment) that's the reason for the exception.
As of Kafka Streams 2.1.0 you should be using Materialized API:
Used to describe how a StateStore should be materialized. You can either provide a custom StateStore backend through one of the provided methods accepting a supplier or use the default RocksDB backends by providing just a store name.
Materialized
gives you a full control over the underlying state store for join and gives withRetention(java.time.Duration retention):
Configure retention period for window and session stores.
Note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53988360%2fhow-to-specify-retention-period-of-join-window%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
When you join
and used JoinWindows.of(JOIN_WINDOW)
you implicitly defined the metadata of the underlying state store.
From the javadoc of JoinWindows.of:
Specifies that records of the same key are joinable if their timestamps are within timeDifference, i.e., the timestamp of a record from the secondary stream is max timeDifference earlier or later than the timestamp of the record from the primary stream.
The so-called retention period (aka window maintain duration) was earlier (before Kafka Streams 2.1.0) specified using until:
Set the window maintain duration (retention time) in milliseconds. This retention time is a guaranteed lower bound for how long a window will be maintained.
Since by default the retention is 1 day (can't find the reference at the moment) that's the reason for the exception.
As of Kafka Streams 2.1.0 you should be using Materialized API:
Used to describe how a StateStore should be materialized. You can either provide a custom StateStore backend through one of the provided methods accepting a supplier or use the default RocksDB backends by providing just a store name.
Materialized
gives you a full control over the underlying state store for join and gives withRetention(java.time.Duration retention):
Configure retention period for window and session stores.
Note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period.
add a comment |
When you join
and used JoinWindows.of(JOIN_WINDOW)
you implicitly defined the metadata of the underlying state store.
From the javadoc of JoinWindows.of:
Specifies that records of the same key are joinable if their timestamps are within timeDifference, i.e., the timestamp of a record from the secondary stream is max timeDifference earlier or later than the timestamp of the record from the primary stream.
The so-called retention period (aka window maintain duration) was earlier (before Kafka Streams 2.1.0) specified using until:
Set the window maintain duration (retention time) in milliseconds. This retention time is a guaranteed lower bound for how long a window will be maintained.
Since by default the retention is 1 day (can't find the reference at the moment) that's the reason for the exception.
As of Kafka Streams 2.1.0 you should be using Materialized API:
Used to describe how a StateStore should be materialized. You can either provide a custom StateStore backend through one of the provided methods accepting a supplier or use the default RocksDB backends by providing just a store name.
Materialized
gives you a full control over the underlying state store for join and gives withRetention(java.time.Duration retention):
Configure retention period for window and session stores.
Note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period.
add a comment |
When you join
and used JoinWindows.of(JOIN_WINDOW)
you implicitly defined the metadata of the underlying state store.
From the javadoc of JoinWindows.of:
Specifies that records of the same key are joinable if their timestamps are within timeDifference, i.e., the timestamp of a record from the secondary stream is max timeDifference earlier or later than the timestamp of the record from the primary stream.
The so-called retention period (aka window maintain duration) was earlier (before Kafka Streams 2.1.0) specified using until:
Set the window maintain duration (retention time) in milliseconds. This retention time is a guaranteed lower bound for how long a window will be maintained.
Since by default the retention is 1 day (can't find the reference at the moment) that's the reason for the exception.
As of Kafka Streams 2.1.0 you should be using Materialized API:
Used to describe how a StateStore should be materialized. You can either provide a custom StateStore backend through one of the provided methods accepting a supplier or use the default RocksDB backends by providing just a store name.
Materialized
gives you a full control over the underlying state store for join and gives withRetention(java.time.Duration retention):
Configure retention period for window and session stores.
Note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period.
When you join
and used JoinWindows.of(JOIN_WINDOW)
you implicitly defined the metadata of the underlying state store.
From the javadoc of JoinWindows.of:
Specifies that records of the same key are joinable if their timestamps are within timeDifference, i.e., the timestamp of a record from the secondary stream is max timeDifference earlier or later than the timestamp of the record from the primary stream.
The so-called retention period (aka window maintain duration) was earlier (before Kafka Streams 2.1.0) specified using until:
Set the window maintain duration (retention time) in milliseconds. This retention time is a guaranteed lower bound for how long a window will be maintained.
Since by default the retention is 1 day (can't find the reference at the moment) that's the reason for the exception.
As of Kafka Streams 2.1.0 you should be using Materialized API:
Used to describe how a StateStore should be materialized. You can either provide a custom StateStore backend through one of the provided methods accepting a supplier or use the default RocksDB backends by providing just a store name.
Materialized
gives you a full control over the underlying state store for join and gives withRetention(java.time.Duration retention):
Configure retention period for window and session stores.
Note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period.
edited Jan 1 at 17:00
answered Dec 31 '18 at 18:06
Jacek LaskowskiJacek Laskowski
44.6k18132267
44.6k18132267
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53988360%2fhow-to-specify-retention-period-of-join-window%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown