Flink window operator checkpointing

Multi tool use
Multi tool use












0














I want to know how flink does the checkpoint of the window operator. How to ensure that it is exactly once when recovering? For example, saving the tuples in the current window and saving the progress of the current window processing. I want to know the detailed process of the window operator's checkpoint and recovery.










share|improve this question






















  • There are some official docs which may be useful to you. Stream Checkpointing: ci.apache.org/projects/flink/flink-docs-release-1.7/internals/… Exactly-Once: flink.apache.org/features/2018/03/01/…
    – bupt_ljy
    yesterday












  • Thank you for your answer, but I want to know the detailed mechanism inside the window, how to handle the contents of the cache and record the current state of the process. For example, it will record which has been processed, and skip when it is restored.
    – Cheng Jiang
    yesterday
















0














I want to know how flink does the checkpoint of the window operator. How to ensure that it is exactly once when recovering? For example, saving the tuples in the current window and saving the progress of the current window processing. I want to know the detailed process of the window operator's checkpoint and recovery.










share|improve this question






















  • There are some official docs which may be useful to you. Stream Checkpointing: ci.apache.org/projects/flink/flink-docs-release-1.7/internals/… Exactly-Once: flink.apache.org/features/2018/03/01/…
    – bupt_ljy
    yesterday












  • Thank you for your answer, but I want to know the detailed mechanism inside the window, how to handle the contents of the cache and record the current state of the process. For example, it will record which has been processed, and skip when it is restored.
    – Cheng Jiang
    yesterday














0












0








0







I want to know how flink does the checkpoint of the window operator. How to ensure that it is exactly once when recovering? For example, saving the tuples in the current window and saving the progress of the current window processing. I want to know the detailed process of the window operator's checkpoint and recovery.










share|improve this question













I want to know how flink does the checkpoint of the window operator. How to ensure that it is exactly once when recovering? For example, saving the tuples in the current window and saving the progress of the current window processing. I want to know the detailed process of the window operator's checkpoint and recovery.







apache-flink






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked yesterday









Cheng Jiang

316




316












  • There are some official docs which may be useful to you. Stream Checkpointing: ci.apache.org/projects/flink/flink-docs-release-1.7/internals/… Exactly-Once: flink.apache.org/features/2018/03/01/…
    – bupt_ljy
    yesterday












  • Thank you for your answer, but I want to know the detailed mechanism inside the window, how to handle the contents of the cache and record the current state of the process. For example, it will record which has been processed, and skip when it is restored.
    – Cheng Jiang
    yesterday


















  • There are some official docs which may be useful to you. Stream Checkpointing: ci.apache.org/projects/flink/flink-docs-release-1.7/internals/… Exactly-Once: flink.apache.org/features/2018/03/01/…
    – bupt_ljy
    yesterday












  • Thank you for your answer, but I want to know the detailed mechanism inside the window, how to handle the contents of the cache and record the current state of the process. For example, it will record which has been processed, and skip when it is restored.
    – Cheng Jiang
    yesterday
















There are some official docs which may be useful to you. Stream Checkpointing: ci.apache.org/projects/flink/flink-docs-release-1.7/internals/… Exactly-Once: flink.apache.org/features/2018/03/01/…
– bupt_ljy
yesterday






There are some official docs which may be useful to you. Stream Checkpointing: ci.apache.org/projects/flink/flink-docs-release-1.7/internals/… Exactly-Once: flink.apache.org/features/2018/03/01/…
– bupt_ljy
yesterday














Thank you for your answer, but I want to know the detailed mechanism inside the window, how to handle the contents of the cache and record the current state of the process. For example, it will record which has been processed, and skip when it is restored.
– Cheng Jiang
yesterday




Thank you for your answer, but I want to know the detailed mechanism inside the window, how to handle the contents of the cache and record the current state of the process. For example, it will record which has been processed, and skip when it is restored.
– Cheng Jiang
yesterday












1 Answer
1






active

oldest

votes


















1














All of Flink's stateful operators participate in the same checkpointing mechanism. When instructed to do so by the checkpoint coordinator (part of the job manager), the task managers initiate a checkpoint in each parallel instance of every source operator. The sources checkpoint their offsets and insert a checkpoint barrier into the stream. This divides the stream into the parts before and after the checkpoint. The barriers flow through the graph, and each stateful operator checkpoints its state upon having processed the stream up to the checkpoint barrier. The details are described at the link shared by @bupt_ljy.



Thus these checkpoints capture the entire state of the distributed pipeline, recording offsets into the input queues as well as the state throughout the job graph that has resulted from having ingested the data up to that point. When a failure occurs, the sources are rewound, the state is restored, and processing is resumed.



Given that during recovery the sources are rewound and replayed, "exactly once" means that the state managed by Flink is affected exactly once, not that the stream elements are processed exactly once.



There's nothing particularly special about windows in this regard. Depending on the type of window function being applied, a window's contents are kept in an element of managed ListState, ReducingState, AggregatingState, or FoldingState. As stream elements arrive and are being assigned to a window, they are appended, reduced, aggregated, or folded into that state. Other components of the window API, including Triggers and ProcessWindowFunctions, can have state that is checkpointed as well. For example, CountTrigger using ReducingState to keep track of how many elements have been assigned to the window, adding one to the count as each element is added to the window.



In the case where the window function is a ProcessWindowFunction, all of the elements assigned to the window are saved in Flink state, and are passed in an Iterable to the ProcessWindowFunction when the window is triggered. That function iterates over the contents and produces a result. The internal state of the ProcessWindowFunction is not checkpointed; if the job fails during the execution of the ProcessWindowFunction the job will resume from the most recently completed checkpoint. This will involve rewinding back to a time before the window received the event that triggered the window firing (that event can't be included in the checkpoint because a checkpoint barrier following it can not have had its effect yet). Sooner or later the window will again reach the point where it is triggered and the ProcessWindowFunction will be called again -- with the same window contents it received the first time -- and hopefully this time it won't fail. (Note that I've ignored the case of processing-time windows, which do not behave deterministically.)



When a ProcessWindowFunction uses managed/checkpointed state, it is used to remember things between firings, not within a single firing. For example, a window that allows late events might want to store the result previously reported, and then issue an update for each late event.






share|improve this answer























  • 1. What you mean is that window will save the contents of the window as state save and record the tuple currently being processed. At the time of failure recovery, state is loaded to restore the contents of the window, and the state of the tuple being processed before the failure is restored.
    – Cheng Jiang
    yesterday










  • 2. For example, I have a window A with content [1, 2, 3, 4, 5, 6]. I am using ProcessWindowFunctions. When window A receives the barrier at 3 and completes the checkpoint, then this checkpoint saves the contents {1, 2, 3, 4, 5, 6} and the processed tuple 3. However, a failure occurred while processing 5. Then, when recovering, window A will reload [1, 2, 3, 4, 5, 6] and start processing again from 3. Am I correct?
    – Cheng Jiang
    yesterday












  • No, that's not correct. I've updated my answer; hopefully it is clearer now.
    – David Anderson
    19 hours ago











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53943989%2fflink-window-operator-checkpointing%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









1














All of Flink's stateful operators participate in the same checkpointing mechanism. When instructed to do so by the checkpoint coordinator (part of the job manager), the task managers initiate a checkpoint in each parallel instance of every source operator. The sources checkpoint their offsets and insert a checkpoint barrier into the stream. This divides the stream into the parts before and after the checkpoint. The barriers flow through the graph, and each stateful operator checkpoints its state upon having processed the stream up to the checkpoint barrier. The details are described at the link shared by @bupt_ljy.



Thus these checkpoints capture the entire state of the distributed pipeline, recording offsets into the input queues as well as the state throughout the job graph that has resulted from having ingested the data up to that point. When a failure occurs, the sources are rewound, the state is restored, and processing is resumed.



Given that during recovery the sources are rewound and replayed, "exactly once" means that the state managed by Flink is affected exactly once, not that the stream elements are processed exactly once.



There's nothing particularly special about windows in this regard. Depending on the type of window function being applied, a window's contents are kept in an element of managed ListState, ReducingState, AggregatingState, or FoldingState. As stream elements arrive and are being assigned to a window, they are appended, reduced, aggregated, or folded into that state. Other components of the window API, including Triggers and ProcessWindowFunctions, can have state that is checkpointed as well. For example, CountTrigger using ReducingState to keep track of how many elements have been assigned to the window, adding one to the count as each element is added to the window.



In the case where the window function is a ProcessWindowFunction, all of the elements assigned to the window are saved in Flink state, and are passed in an Iterable to the ProcessWindowFunction when the window is triggered. That function iterates over the contents and produces a result. The internal state of the ProcessWindowFunction is not checkpointed; if the job fails during the execution of the ProcessWindowFunction the job will resume from the most recently completed checkpoint. This will involve rewinding back to a time before the window received the event that triggered the window firing (that event can't be included in the checkpoint because a checkpoint barrier following it can not have had its effect yet). Sooner or later the window will again reach the point where it is triggered and the ProcessWindowFunction will be called again -- with the same window contents it received the first time -- and hopefully this time it won't fail. (Note that I've ignored the case of processing-time windows, which do not behave deterministically.)



When a ProcessWindowFunction uses managed/checkpointed state, it is used to remember things between firings, not within a single firing. For example, a window that allows late events might want to store the result previously reported, and then issue an update for each late event.






share|improve this answer























  • 1. What you mean is that window will save the contents of the window as state save and record the tuple currently being processed. At the time of failure recovery, state is loaded to restore the contents of the window, and the state of the tuple being processed before the failure is restored.
    – Cheng Jiang
    yesterday










  • 2. For example, I have a window A with content [1, 2, 3, 4, 5, 6]. I am using ProcessWindowFunctions. When window A receives the barrier at 3 and completes the checkpoint, then this checkpoint saves the contents {1, 2, 3, 4, 5, 6} and the processed tuple 3. However, a failure occurred while processing 5. Then, when recovering, window A will reload [1, 2, 3, 4, 5, 6] and start processing again from 3. Am I correct?
    – Cheng Jiang
    yesterday












  • No, that's not correct. I've updated my answer; hopefully it is clearer now.
    – David Anderson
    19 hours ago
















1














All of Flink's stateful operators participate in the same checkpointing mechanism. When instructed to do so by the checkpoint coordinator (part of the job manager), the task managers initiate a checkpoint in each parallel instance of every source operator. The sources checkpoint their offsets and insert a checkpoint barrier into the stream. This divides the stream into the parts before and after the checkpoint. The barriers flow through the graph, and each stateful operator checkpoints its state upon having processed the stream up to the checkpoint barrier. The details are described at the link shared by @bupt_ljy.



Thus these checkpoints capture the entire state of the distributed pipeline, recording offsets into the input queues as well as the state throughout the job graph that has resulted from having ingested the data up to that point. When a failure occurs, the sources are rewound, the state is restored, and processing is resumed.



Given that during recovery the sources are rewound and replayed, "exactly once" means that the state managed by Flink is affected exactly once, not that the stream elements are processed exactly once.



There's nothing particularly special about windows in this regard. Depending on the type of window function being applied, a window's contents are kept in an element of managed ListState, ReducingState, AggregatingState, or FoldingState. As stream elements arrive and are being assigned to a window, they are appended, reduced, aggregated, or folded into that state. Other components of the window API, including Triggers and ProcessWindowFunctions, can have state that is checkpointed as well. For example, CountTrigger using ReducingState to keep track of how many elements have been assigned to the window, adding one to the count as each element is added to the window.



In the case where the window function is a ProcessWindowFunction, all of the elements assigned to the window are saved in Flink state, and are passed in an Iterable to the ProcessWindowFunction when the window is triggered. That function iterates over the contents and produces a result. The internal state of the ProcessWindowFunction is not checkpointed; if the job fails during the execution of the ProcessWindowFunction the job will resume from the most recently completed checkpoint. This will involve rewinding back to a time before the window received the event that triggered the window firing (that event can't be included in the checkpoint because a checkpoint barrier following it can not have had its effect yet). Sooner or later the window will again reach the point where it is triggered and the ProcessWindowFunction will be called again -- with the same window contents it received the first time -- and hopefully this time it won't fail. (Note that I've ignored the case of processing-time windows, which do not behave deterministically.)



When a ProcessWindowFunction uses managed/checkpointed state, it is used to remember things between firings, not within a single firing. For example, a window that allows late events might want to store the result previously reported, and then issue an update for each late event.






share|improve this answer























  • 1. What you mean is that window will save the contents of the window as state save and record the tuple currently being processed. At the time of failure recovery, state is loaded to restore the contents of the window, and the state of the tuple being processed before the failure is restored.
    – Cheng Jiang
    yesterday










  • 2. For example, I have a window A with content [1, 2, 3, 4, 5, 6]. I am using ProcessWindowFunctions. When window A receives the barrier at 3 and completes the checkpoint, then this checkpoint saves the contents {1, 2, 3, 4, 5, 6} and the processed tuple 3. However, a failure occurred while processing 5. Then, when recovering, window A will reload [1, 2, 3, 4, 5, 6] and start processing again from 3. Am I correct?
    – Cheng Jiang
    yesterday












  • No, that's not correct. I've updated my answer; hopefully it is clearer now.
    – David Anderson
    19 hours ago














1












1








1






All of Flink's stateful operators participate in the same checkpointing mechanism. When instructed to do so by the checkpoint coordinator (part of the job manager), the task managers initiate a checkpoint in each parallel instance of every source operator. The sources checkpoint their offsets and insert a checkpoint barrier into the stream. This divides the stream into the parts before and after the checkpoint. The barriers flow through the graph, and each stateful operator checkpoints its state upon having processed the stream up to the checkpoint barrier. The details are described at the link shared by @bupt_ljy.



Thus these checkpoints capture the entire state of the distributed pipeline, recording offsets into the input queues as well as the state throughout the job graph that has resulted from having ingested the data up to that point. When a failure occurs, the sources are rewound, the state is restored, and processing is resumed.



Given that during recovery the sources are rewound and replayed, "exactly once" means that the state managed by Flink is affected exactly once, not that the stream elements are processed exactly once.



There's nothing particularly special about windows in this regard. Depending on the type of window function being applied, a window's contents are kept in an element of managed ListState, ReducingState, AggregatingState, or FoldingState. As stream elements arrive and are being assigned to a window, they are appended, reduced, aggregated, or folded into that state. Other components of the window API, including Triggers and ProcessWindowFunctions, can have state that is checkpointed as well. For example, CountTrigger using ReducingState to keep track of how many elements have been assigned to the window, adding one to the count as each element is added to the window.



In the case where the window function is a ProcessWindowFunction, all of the elements assigned to the window are saved in Flink state, and are passed in an Iterable to the ProcessWindowFunction when the window is triggered. That function iterates over the contents and produces a result. The internal state of the ProcessWindowFunction is not checkpointed; if the job fails during the execution of the ProcessWindowFunction the job will resume from the most recently completed checkpoint. This will involve rewinding back to a time before the window received the event that triggered the window firing (that event can't be included in the checkpoint because a checkpoint barrier following it can not have had its effect yet). Sooner or later the window will again reach the point where it is triggered and the ProcessWindowFunction will be called again -- with the same window contents it received the first time -- and hopefully this time it won't fail. (Note that I've ignored the case of processing-time windows, which do not behave deterministically.)



When a ProcessWindowFunction uses managed/checkpointed state, it is used to remember things between firings, not within a single firing. For example, a window that allows late events might want to store the result previously reported, and then issue an update for each late event.






share|improve this answer














All of Flink's stateful operators participate in the same checkpointing mechanism. When instructed to do so by the checkpoint coordinator (part of the job manager), the task managers initiate a checkpoint in each parallel instance of every source operator. The sources checkpoint their offsets and insert a checkpoint barrier into the stream. This divides the stream into the parts before and after the checkpoint. The barriers flow through the graph, and each stateful operator checkpoints its state upon having processed the stream up to the checkpoint barrier. The details are described at the link shared by @bupt_ljy.



Thus these checkpoints capture the entire state of the distributed pipeline, recording offsets into the input queues as well as the state throughout the job graph that has resulted from having ingested the data up to that point. When a failure occurs, the sources are rewound, the state is restored, and processing is resumed.



Given that during recovery the sources are rewound and replayed, "exactly once" means that the state managed by Flink is affected exactly once, not that the stream elements are processed exactly once.



There's nothing particularly special about windows in this regard. Depending on the type of window function being applied, a window's contents are kept in an element of managed ListState, ReducingState, AggregatingState, or FoldingState. As stream elements arrive and are being assigned to a window, they are appended, reduced, aggregated, or folded into that state. Other components of the window API, including Triggers and ProcessWindowFunctions, can have state that is checkpointed as well. For example, CountTrigger using ReducingState to keep track of how many elements have been assigned to the window, adding one to the count as each element is added to the window.



In the case where the window function is a ProcessWindowFunction, all of the elements assigned to the window are saved in Flink state, and are passed in an Iterable to the ProcessWindowFunction when the window is triggered. That function iterates over the contents and produces a result. The internal state of the ProcessWindowFunction is not checkpointed; if the job fails during the execution of the ProcessWindowFunction the job will resume from the most recently completed checkpoint. This will involve rewinding back to a time before the window received the event that triggered the window firing (that event can't be included in the checkpoint because a checkpoint barrier following it can not have had its effect yet). Sooner or later the window will again reach the point where it is triggered and the ProcessWindowFunction will be called again -- with the same window contents it received the first time -- and hopefully this time it won't fail. (Note that I've ignored the case of processing-time windows, which do not behave deterministically.)



When a ProcessWindowFunction uses managed/checkpointed state, it is used to remember things between firings, not within a single firing. For example, a window that allows late events might want to store the result previously reported, and then issue an update for each late event.







share|improve this answer














share|improve this answer



share|improve this answer








edited 18 hours ago

























answered yesterday









David Anderson

4,83421120




4,83421120












  • 1. What you mean is that window will save the contents of the window as state save and record the tuple currently being processed. At the time of failure recovery, state is loaded to restore the contents of the window, and the state of the tuple being processed before the failure is restored.
    – Cheng Jiang
    yesterday










  • 2. For example, I have a window A with content [1, 2, 3, 4, 5, 6]. I am using ProcessWindowFunctions. When window A receives the barrier at 3 and completes the checkpoint, then this checkpoint saves the contents {1, 2, 3, 4, 5, 6} and the processed tuple 3. However, a failure occurred while processing 5. Then, when recovering, window A will reload [1, 2, 3, 4, 5, 6] and start processing again from 3. Am I correct?
    – Cheng Jiang
    yesterday












  • No, that's not correct. I've updated my answer; hopefully it is clearer now.
    – David Anderson
    19 hours ago


















  • 1. What you mean is that window will save the contents of the window as state save and record the tuple currently being processed. At the time of failure recovery, state is loaded to restore the contents of the window, and the state of the tuple being processed before the failure is restored.
    – Cheng Jiang
    yesterday










  • 2. For example, I have a window A with content [1, 2, 3, 4, 5, 6]. I am using ProcessWindowFunctions. When window A receives the barrier at 3 and completes the checkpoint, then this checkpoint saves the contents {1, 2, 3, 4, 5, 6} and the processed tuple 3. However, a failure occurred while processing 5. Then, when recovering, window A will reload [1, 2, 3, 4, 5, 6] and start processing again from 3. Am I correct?
    – Cheng Jiang
    yesterday












  • No, that's not correct. I've updated my answer; hopefully it is clearer now.
    – David Anderson
    19 hours ago
















1. What you mean is that window will save the contents of the window as state save and record the tuple currently being processed. At the time of failure recovery, state is loaded to restore the contents of the window, and the state of the tuple being processed before the failure is restored.
– Cheng Jiang
yesterday




1. What you mean is that window will save the contents of the window as state save and record the tuple currently being processed. At the time of failure recovery, state is loaded to restore the contents of the window, and the state of the tuple being processed before the failure is restored.
– Cheng Jiang
yesterday












2. For example, I have a window A with content [1, 2, 3, 4, 5, 6]. I am using ProcessWindowFunctions. When window A receives the barrier at 3 and completes the checkpoint, then this checkpoint saves the contents {1, 2, 3, 4, 5, 6} and the processed tuple 3. However, a failure occurred while processing 5. Then, when recovering, window A will reload [1, 2, 3, 4, 5, 6] and start processing again from 3. Am I correct?
– Cheng Jiang
yesterday






2. For example, I have a window A with content [1, 2, 3, 4, 5, 6]. I am using ProcessWindowFunctions. When window A receives the barrier at 3 and completes the checkpoint, then this checkpoint saves the contents {1, 2, 3, 4, 5, 6} and the processed tuple 3. However, a failure occurred while processing 5. Then, when recovering, window A will reload [1, 2, 3, 4, 5, 6] and start processing again from 3. Am I correct?
– Cheng Jiang
yesterday














No, that's not correct. I've updated my answer; hopefully it is clearer now.
– David Anderson
19 hours ago




No, that's not correct. I've updated my answer; hopefully it is clearer now.
– David Anderson
19 hours ago


















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.





Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


Please pay close attention to the following guidance:


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53943989%2fflink-window-operator-checkpointing%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







jx9cVqy,SG z,295 SH9QqsYKCmzL5a,Y,dYnIcFWOsGHz8lS,ziMI ABgvE54zUL4yId1iKn Z28 5fVw9
B,bhRYj4OiIhAHndQLipXBJMkJy1am6S8emFkwyuzr1vITc

Popular posts from this blog

Monofisismo

Angular Downloading a file using contenturl with Basic Authentication

Olmecas