Flink window operator checkpointing

Multi tool use
I want to know how flink does the checkpoint of the window operator. How to ensure that it is exactly once when recovering? For example, saving the tuples in the current window and saving the progress of the current window processing. I want to know the detailed process of the window operator's checkpoint and recovery.
apache-flink
add a comment |
I want to know how flink does the checkpoint of the window operator. How to ensure that it is exactly once when recovering? For example, saving the tuples in the current window and saving the progress of the current window processing. I want to know the detailed process of the window operator's checkpoint and recovery.
apache-flink
There are some official docs which may be useful to you. Stream Checkpointing: ci.apache.org/projects/flink/flink-docs-release-1.7/internals/… Exactly-Once: flink.apache.org/features/2018/03/01/…
– bupt_ljy
yesterday
Thank you for your answer, but I want to know the detailed mechanism inside the window, how to handle the contents of the cache and record the current state of the process. For example, it will record which has been processed, and skip when it is restored.
– Cheng Jiang
yesterday
add a comment |
I want to know how flink does the checkpoint of the window operator. How to ensure that it is exactly once when recovering? For example, saving the tuples in the current window and saving the progress of the current window processing. I want to know the detailed process of the window operator's checkpoint and recovery.
apache-flink
I want to know how flink does the checkpoint of the window operator. How to ensure that it is exactly once when recovering? For example, saving the tuples in the current window and saving the progress of the current window processing. I want to know the detailed process of the window operator's checkpoint and recovery.
apache-flink
apache-flink
asked yesterday


Cheng Jiang
316
316
There are some official docs which may be useful to you. Stream Checkpointing: ci.apache.org/projects/flink/flink-docs-release-1.7/internals/… Exactly-Once: flink.apache.org/features/2018/03/01/…
– bupt_ljy
yesterday
Thank you for your answer, but I want to know the detailed mechanism inside the window, how to handle the contents of the cache and record the current state of the process. For example, it will record which has been processed, and skip when it is restored.
– Cheng Jiang
yesterday
add a comment |
There are some official docs which may be useful to you. Stream Checkpointing: ci.apache.org/projects/flink/flink-docs-release-1.7/internals/… Exactly-Once: flink.apache.org/features/2018/03/01/…
– bupt_ljy
yesterday
Thank you for your answer, but I want to know the detailed mechanism inside the window, how to handle the contents of the cache and record the current state of the process. For example, it will record which has been processed, and skip when it is restored.
– Cheng Jiang
yesterday
There are some official docs which may be useful to you. Stream Checkpointing: ci.apache.org/projects/flink/flink-docs-release-1.7/internals/… Exactly-Once: flink.apache.org/features/2018/03/01/…
– bupt_ljy
yesterday
There are some official docs which may be useful to you. Stream Checkpointing: ci.apache.org/projects/flink/flink-docs-release-1.7/internals/… Exactly-Once: flink.apache.org/features/2018/03/01/…
– bupt_ljy
yesterday
Thank you for your answer, but I want to know the detailed mechanism inside the window, how to handle the contents of the cache and record the current state of the process. For example, it will record which has been processed, and skip when it is restored.
– Cheng Jiang
yesterday
Thank you for your answer, but I want to know the detailed mechanism inside the window, how to handle the contents of the cache and record the current state of the process. For example, it will record which has been processed, and skip when it is restored.
– Cheng Jiang
yesterday
add a comment |
1 Answer
1
active
oldest
votes
All of Flink's stateful operators participate in the same checkpointing mechanism. When instructed to do so by the checkpoint coordinator (part of the job manager), the task managers initiate a checkpoint in each parallel instance of every source operator. The sources checkpoint their offsets and insert a checkpoint barrier into the stream. This divides the stream into the parts before and after the checkpoint. The barriers flow through the graph, and each stateful operator checkpoints its state upon having processed the stream up to the checkpoint barrier. The details are described at the link shared by @bupt_ljy.
Thus these checkpoints capture the entire state of the distributed pipeline, recording offsets into the input queues as well as the state throughout the job graph that has resulted from having ingested the data up to that point. When a failure occurs, the sources are rewound, the state is restored, and processing is resumed.
Given that during recovery the sources are rewound and replayed, "exactly once" means that the state managed by Flink is affected exactly once, not that the stream elements are processed exactly once.
There's nothing particularly special about windows in this regard. Depending on the type of window function being applied, a window's contents are kept in an element of managed ListState, ReducingState, AggregatingState, or FoldingState. As stream elements arrive and are being assigned to a window, they are appended, reduced, aggregated, or folded into that state. Other components of the window API, including Triggers and ProcessWindowFunctions, can have state that is checkpointed as well. For example, CountTrigger
using ReducingState to keep track of how many elements have been assigned to the window, adding one to the count as each element is added to the window.
In the case where the window function is a ProcessWindowFunction, all of the elements assigned to the window are saved in Flink state, and are passed in an Iterable to the ProcessWindowFunction when the window is triggered. That function iterates over the contents and produces a result. The internal state of the ProcessWindowFunction is not checkpointed; if the job fails during the execution of the ProcessWindowFunction the job will resume from the most recently completed checkpoint. This will involve rewinding back to a time before the window received the event that triggered the window firing (that event can't be included in the checkpoint because a checkpoint barrier following it can not have had its effect yet). Sooner or later the window will again reach the point where it is triggered and the ProcessWindowFunction will be called again -- with the same window contents it received the first time -- and hopefully this time it won't fail. (Note that I've ignored the case of processing-time windows, which do not behave deterministically.)
When a ProcessWindowFunction uses managed/checkpointed state, it is used to remember things between firings, not within a single firing. For example, a window that allows late events might want to store the result previously reported, and then issue an update for each late event.
1. What you mean is that window will save the contents of the window as state save and record the tuple currently being processed. At the time of failure recovery, state is loaded to restore the contents of the window, and the state of the tuple being processed before the failure is restored.
– Cheng Jiang
yesterday
2. For example, I have a window A with content [1, 2, 3, 4, 5, 6]. I am usingProcessWindowFunctions
. When window A receives the barrier at3
and completes the checkpoint, then this checkpoint saves the contents {1, 2, 3, 4, 5, 6} and the processed tuple3
. However, a failure occurred while processing5
. Then, when recovering, window A will reload [1, 2, 3, 4, 5, 6] and start processing again from3
. Am I correct?
– Cheng Jiang
yesterday
No, that's not correct. I've updated my answer; hopefully it is clearer now.
– David Anderson
19 hours ago
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53943989%2fflink-window-operator-checkpointing%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
All of Flink's stateful operators participate in the same checkpointing mechanism. When instructed to do so by the checkpoint coordinator (part of the job manager), the task managers initiate a checkpoint in each parallel instance of every source operator. The sources checkpoint their offsets and insert a checkpoint barrier into the stream. This divides the stream into the parts before and after the checkpoint. The barriers flow through the graph, and each stateful operator checkpoints its state upon having processed the stream up to the checkpoint barrier. The details are described at the link shared by @bupt_ljy.
Thus these checkpoints capture the entire state of the distributed pipeline, recording offsets into the input queues as well as the state throughout the job graph that has resulted from having ingested the data up to that point. When a failure occurs, the sources are rewound, the state is restored, and processing is resumed.
Given that during recovery the sources are rewound and replayed, "exactly once" means that the state managed by Flink is affected exactly once, not that the stream elements are processed exactly once.
There's nothing particularly special about windows in this regard. Depending on the type of window function being applied, a window's contents are kept in an element of managed ListState, ReducingState, AggregatingState, or FoldingState. As stream elements arrive and are being assigned to a window, they are appended, reduced, aggregated, or folded into that state. Other components of the window API, including Triggers and ProcessWindowFunctions, can have state that is checkpointed as well. For example, CountTrigger
using ReducingState to keep track of how many elements have been assigned to the window, adding one to the count as each element is added to the window.
In the case where the window function is a ProcessWindowFunction, all of the elements assigned to the window are saved in Flink state, and are passed in an Iterable to the ProcessWindowFunction when the window is triggered. That function iterates over the contents and produces a result. The internal state of the ProcessWindowFunction is not checkpointed; if the job fails during the execution of the ProcessWindowFunction the job will resume from the most recently completed checkpoint. This will involve rewinding back to a time before the window received the event that triggered the window firing (that event can't be included in the checkpoint because a checkpoint barrier following it can not have had its effect yet). Sooner or later the window will again reach the point where it is triggered and the ProcessWindowFunction will be called again -- with the same window contents it received the first time -- and hopefully this time it won't fail. (Note that I've ignored the case of processing-time windows, which do not behave deterministically.)
When a ProcessWindowFunction uses managed/checkpointed state, it is used to remember things between firings, not within a single firing. For example, a window that allows late events might want to store the result previously reported, and then issue an update for each late event.
1. What you mean is that window will save the contents of the window as state save and record the tuple currently being processed. At the time of failure recovery, state is loaded to restore the contents of the window, and the state of the tuple being processed before the failure is restored.
– Cheng Jiang
yesterday
2. For example, I have a window A with content [1, 2, 3, 4, 5, 6]. I am usingProcessWindowFunctions
. When window A receives the barrier at3
and completes the checkpoint, then this checkpoint saves the contents {1, 2, 3, 4, 5, 6} and the processed tuple3
. However, a failure occurred while processing5
. Then, when recovering, window A will reload [1, 2, 3, 4, 5, 6] and start processing again from3
. Am I correct?
– Cheng Jiang
yesterday
No, that's not correct. I've updated my answer; hopefully it is clearer now.
– David Anderson
19 hours ago
add a comment |
All of Flink's stateful operators participate in the same checkpointing mechanism. When instructed to do so by the checkpoint coordinator (part of the job manager), the task managers initiate a checkpoint in each parallel instance of every source operator. The sources checkpoint their offsets and insert a checkpoint barrier into the stream. This divides the stream into the parts before and after the checkpoint. The barriers flow through the graph, and each stateful operator checkpoints its state upon having processed the stream up to the checkpoint barrier. The details are described at the link shared by @bupt_ljy.
Thus these checkpoints capture the entire state of the distributed pipeline, recording offsets into the input queues as well as the state throughout the job graph that has resulted from having ingested the data up to that point. When a failure occurs, the sources are rewound, the state is restored, and processing is resumed.
Given that during recovery the sources are rewound and replayed, "exactly once" means that the state managed by Flink is affected exactly once, not that the stream elements are processed exactly once.
There's nothing particularly special about windows in this regard. Depending on the type of window function being applied, a window's contents are kept in an element of managed ListState, ReducingState, AggregatingState, or FoldingState. As stream elements arrive and are being assigned to a window, they are appended, reduced, aggregated, or folded into that state. Other components of the window API, including Triggers and ProcessWindowFunctions, can have state that is checkpointed as well. For example, CountTrigger
using ReducingState to keep track of how many elements have been assigned to the window, adding one to the count as each element is added to the window.
In the case where the window function is a ProcessWindowFunction, all of the elements assigned to the window are saved in Flink state, and are passed in an Iterable to the ProcessWindowFunction when the window is triggered. That function iterates over the contents and produces a result. The internal state of the ProcessWindowFunction is not checkpointed; if the job fails during the execution of the ProcessWindowFunction the job will resume from the most recently completed checkpoint. This will involve rewinding back to a time before the window received the event that triggered the window firing (that event can't be included in the checkpoint because a checkpoint barrier following it can not have had its effect yet). Sooner or later the window will again reach the point where it is triggered and the ProcessWindowFunction will be called again -- with the same window contents it received the first time -- and hopefully this time it won't fail. (Note that I've ignored the case of processing-time windows, which do not behave deterministically.)
When a ProcessWindowFunction uses managed/checkpointed state, it is used to remember things between firings, not within a single firing. For example, a window that allows late events might want to store the result previously reported, and then issue an update for each late event.
1. What you mean is that window will save the contents of the window as state save and record the tuple currently being processed. At the time of failure recovery, state is loaded to restore the contents of the window, and the state of the tuple being processed before the failure is restored.
– Cheng Jiang
yesterday
2. For example, I have a window A with content [1, 2, 3, 4, 5, 6]. I am usingProcessWindowFunctions
. When window A receives the barrier at3
and completes the checkpoint, then this checkpoint saves the contents {1, 2, 3, 4, 5, 6} and the processed tuple3
. However, a failure occurred while processing5
. Then, when recovering, window A will reload [1, 2, 3, 4, 5, 6] and start processing again from3
. Am I correct?
– Cheng Jiang
yesterday
No, that's not correct. I've updated my answer; hopefully it is clearer now.
– David Anderson
19 hours ago
add a comment |
All of Flink's stateful operators participate in the same checkpointing mechanism. When instructed to do so by the checkpoint coordinator (part of the job manager), the task managers initiate a checkpoint in each parallel instance of every source operator. The sources checkpoint their offsets and insert a checkpoint barrier into the stream. This divides the stream into the parts before and after the checkpoint. The barriers flow through the graph, and each stateful operator checkpoints its state upon having processed the stream up to the checkpoint barrier. The details are described at the link shared by @bupt_ljy.
Thus these checkpoints capture the entire state of the distributed pipeline, recording offsets into the input queues as well as the state throughout the job graph that has resulted from having ingested the data up to that point. When a failure occurs, the sources are rewound, the state is restored, and processing is resumed.
Given that during recovery the sources are rewound and replayed, "exactly once" means that the state managed by Flink is affected exactly once, not that the stream elements are processed exactly once.
There's nothing particularly special about windows in this regard. Depending on the type of window function being applied, a window's contents are kept in an element of managed ListState, ReducingState, AggregatingState, or FoldingState. As stream elements arrive and are being assigned to a window, they are appended, reduced, aggregated, or folded into that state. Other components of the window API, including Triggers and ProcessWindowFunctions, can have state that is checkpointed as well. For example, CountTrigger
using ReducingState to keep track of how many elements have been assigned to the window, adding one to the count as each element is added to the window.
In the case where the window function is a ProcessWindowFunction, all of the elements assigned to the window are saved in Flink state, and are passed in an Iterable to the ProcessWindowFunction when the window is triggered. That function iterates over the contents and produces a result. The internal state of the ProcessWindowFunction is not checkpointed; if the job fails during the execution of the ProcessWindowFunction the job will resume from the most recently completed checkpoint. This will involve rewinding back to a time before the window received the event that triggered the window firing (that event can't be included in the checkpoint because a checkpoint barrier following it can not have had its effect yet). Sooner or later the window will again reach the point where it is triggered and the ProcessWindowFunction will be called again -- with the same window contents it received the first time -- and hopefully this time it won't fail. (Note that I've ignored the case of processing-time windows, which do not behave deterministically.)
When a ProcessWindowFunction uses managed/checkpointed state, it is used to remember things between firings, not within a single firing. For example, a window that allows late events might want to store the result previously reported, and then issue an update for each late event.
All of Flink's stateful operators participate in the same checkpointing mechanism. When instructed to do so by the checkpoint coordinator (part of the job manager), the task managers initiate a checkpoint in each parallel instance of every source operator. The sources checkpoint their offsets and insert a checkpoint barrier into the stream. This divides the stream into the parts before and after the checkpoint. The barriers flow through the graph, and each stateful operator checkpoints its state upon having processed the stream up to the checkpoint barrier. The details are described at the link shared by @bupt_ljy.
Thus these checkpoints capture the entire state of the distributed pipeline, recording offsets into the input queues as well as the state throughout the job graph that has resulted from having ingested the data up to that point. When a failure occurs, the sources are rewound, the state is restored, and processing is resumed.
Given that during recovery the sources are rewound and replayed, "exactly once" means that the state managed by Flink is affected exactly once, not that the stream elements are processed exactly once.
There's nothing particularly special about windows in this regard. Depending on the type of window function being applied, a window's contents are kept in an element of managed ListState, ReducingState, AggregatingState, or FoldingState. As stream elements arrive and are being assigned to a window, they are appended, reduced, aggregated, or folded into that state. Other components of the window API, including Triggers and ProcessWindowFunctions, can have state that is checkpointed as well. For example, CountTrigger
using ReducingState to keep track of how many elements have been assigned to the window, adding one to the count as each element is added to the window.
In the case where the window function is a ProcessWindowFunction, all of the elements assigned to the window are saved in Flink state, and are passed in an Iterable to the ProcessWindowFunction when the window is triggered. That function iterates over the contents and produces a result. The internal state of the ProcessWindowFunction is not checkpointed; if the job fails during the execution of the ProcessWindowFunction the job will resume from the most recently completed checkpoint. This will involve rewinding back to a time before the window received the event that triggered the window firing (that event can't be included in the checkpoint because a checkpoint barrier following it can not have had its effect yet). Sooner or later the window will again reach the point where it is triggered and the ProcessWindowFunction will be called again -- with the same window contents it received the first time -- and hopefully this time it won't fail. (Note that I've ignored the case of processing-time windows, which do not behave deterministically.)
When a ProcessWindowFunction uses managed/checkpointed state, it is used to remember things between firings, not within a single firing. For example, a window that allows late events might want to store the result previously reported, and then issue an update for each late event.
edited 18 hours ago
answered yesterday


David Anderson
4,83421120
4,83421120
1. What you mean is that window will save the contents of the window as state save and record the tuple currently being processed. At the time of failure recovery, state is loaded to restore the contents of the window, and the state of the tuple being processed before the failure is restored.
– Cheng Jiang
yesterday
2. For example, I have a window A with content [1, 2, 3, 4, 5, 6]. I am usingProcessWindowFunctions
. When window A receives the barrier at3
and completes the checkpoint, then this checkpoint saves the contents {1, 2, 3, 4, 5, 6} and the processed tuple3
. However, a failure occurred while processing5
. Then, when recovering, window A will reload [1, 2, 3, 4, 5, 6] and start processing again from3
. Am I correct?
– Cheng Jiang
yesterday
No, that's not correct. I've updated my answer; hopefully it is clearer now.
– David Anderson
19 hours ago
add a comment |
1. What you mean is that window will save the contents of the window as state save and record the tuple currently being processed. At the time of failure recovery, state is loaded to restore the contents of the window, and the state of the tuple being processed before the failure is restored.
– Cheng Jiang
yesterday
2. For example, I have a window A with content [1, 2, 3, 4, 5, 6]. I am usingProcessWindowFunctions
. When window A receives the barrier at3
and completes the checkpoint, then this checkpoint saves the contents {1, 2, 3, 4, 5, 6} and the processed tuple3
. However, a failure occurred while processing5
. Then, when recovering, window A will reload [1, 2, 3, 4, 5, 6] and start processing again from3
. Am I correct?
– Cheng Jiang
yesterday
No, that's not correct. I've updated my answer; hopefully it is clearer now.
– David Anderson
19 hours ago
1. What you mean is that window will save the contents of the window as state save and record the tuple currently being processed. At the time of failure recovery, state is loaded to restore the contents of the window, and the state of the tuple being processed before the failure is restored.
– Cheng Jiang
yesterday
1. What you mean is that window will save the contents of the window as state save and record the tuple currently being processed. At the time of failure recovery, state is loaded to restore the contents of the window, and the state of the tuple being processed before the failure is restored.
– Cheng Jiang
yesterday
2. For example, I have a window A with content [1, 2, 3, 4, 5, 6]. I am using
ProcessWindowFunctions
. When window A receives the barrier at 3
and completes the checkpoint, then this checkpoint saves the contents {1, 2, 3, 4, 5, 6} and the processed tuple 3
. However, a failure occurred while processing 5
. Then, when recovering, window A will reload [1, 2, 3, 4, 5, 6] and start processing again from 3
. Am I correct?– Cheng Jiang
yesterday
2. For example, I have a window A with content [1, 2, 3, 4, 5, 6]. I am using
ProcessWindowFunctions
. When window A receives the barrier at 3
and completes the checkpoint, then this checkpoint saves the contents {1, 2, 3, 4, 5, 6} and the processed tuple 3
. However, a failure occurred while processing 5
. Then, when recovering, window A will reload [1, 2, 3, 4, 5, 6] and start processing again from 3
. Am I correct?– Cheng Jiang
yesterday
No, that's not correct. I've updated my answer; hopefully it is clearer now.
– David Anderson
19 hours ago
No, that's not correct. I've updated my answer; hopefully it is clearer now.
– David Anderson
19 hours ago
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53943989%2fflink-window-operator-checkpointing%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
jx9cVqy,SG z,295 SH9QqsYKCmzL5a,Y,dYnIcFWOsGHz8lS,ziMI ABgvE54zUL4yId1iKn Z28 5fVw9
There are some official docs which may be useful to you. Stream Checkpointing: ci.apache.org/projects/flink/flink-docs-release-1.7/internals/… Exactly-Once: flink.apache.org/features/2018/03/01/…
– bupt_ljy
yesterday
Thank you for your answer, but I want to know the detailed mechanism inside the window, how to handle the contents of the cache and record the current state of the process. For example, it will record which has been processed, and skip when it is restored.
– Cheng Jiang
yesterday