Wrap operator in RxJS so it can be applied on materialized stream

Multi tool use
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}
I'm looking for a way to trace value 'route' among stream operators. I have a materialized stream with additional metadata on Notification object (ex. valueId
property). It's definition will look something like this:
const x = stream.pipe(
materialize(),
map(x => Object.assign(x, {valueId: randomInt()}))
);
Now I need to wrap operators that are applied to x
. Let's say I need to use map(x => x * 2)
, but I cannot do it like this:
x.pipe(dematerialize(), map(x => x * 2))
Because I will lose my metadata. How do I make a wrap function, that will apply to any operator and will still preserve my additional metadata?
x.pipe(wrap(map(x => x * 2)))
I thought about something like this:
function wrap<T, R>(
operator: OperatorFunction<T, R>
): (source: Observable<TaggedValue<T>>) => Observable<TaggedValue<R>> {
return source =>
source.pipe(
switchMap(x =>
of(x).pipe(
dematerialize(),
operator,
materialize(),
map(z => Object.assign(z, { valueId: x.valueId }))
)
)
);
}
But it generates fake complete messages from of()
.
Sample: https://stackblitz.com/edit/rxjs-fhv54p
rxjs reactivex
add a comment |
I'm looking for a way to trace value 'route' among stream operators. I have a materialized stream with additional metadata on Notification object (ex. valueId
property). It's definition will look something like this:
const x = stream.pipe(
materialize(),
map(x => Object.assign(x, {valueId: randomInt()}))
);
Now I need to wrap operators that are applied to x
. Let's say I need to use map(x => x * 2)
, but I cannot do it like this:
x.pipe(dematerialize(), map(x => x * 2))
Because I will lose my metadata. How do I make a wrap function, that will apply to any operator and will still preserve my additional metadata?
x.pipe(wrap(map(x => x * 2)))
I thought about something like this:
function wrap<T, R>(
operator: OperatorFunction<T, R>
): (source: Observable<TaggedValue<T>>) => Observable<TaggedValue<R>> {
return source =>
source.pipe(
switchMap(x =>
of(x).pipe(
dematerialize(),
operator,
materialize(),
map(z => Object.assign(z, { valueId: x.valueId }))
)
)
);
}
But it generates fake complete messages from of()
.
Sample: https://stackblitz.com/edit/rxjs-fhv54p
rxjs reactivex
add a comment |
I'm looking for a way to trace value 'route' among stream operators. I have a materialized stream with additional metadata on Notification object (ex. valueId
property). It's definition will look something like this:
const x = stream.pipe(
materialize(),
map(x => Object.assign(x, {valueId: randomInt()}))
);
Now I need to wrap operators that are applied to x
. Let's say I need to use map(x => x * 2)
, but I cannot do it like this:
x.pipe(dematerialize(), map(x => x * 2))
Because I will lose my metadata. How do I make a wrap function, that will apply to any operator and will still preserve my additional metadata?
x.pipe(wrap(map(x => x * 2)))
I thought about something like this:
function wrap<T, R>(
operator: OperatorFunction<T, R>
): (source: Observable<TaggedValue<T>>) => Observable<TaggedValue<R>> {
return source =>
source.pipe(
switchMap(x =>
of(x).pipe(
dematerialize(),
operator,
materialize(),
map(z => Object.assign(z, { valueId: x.valueId }))
)
)
);
}
But it generates fake complete messages from of()
.
Sample: https://stackblitz.com/edit/rxjs-fhv54p
rxjs reactivex
I'm looking for a way to trace value 'route' among stream operators. I have a materialized stream with additional metadata on Notification object (ex. valueId
property). It's definition will look something like this:
const x = stream.pipe(
materialize(),
map(x => Object.assign(x, {valueId: randomInt()}))
);
Now I need to wrap operators that are applied to x
. Let's say I need to use map(x => x * 2)
, but I cannot do it like this:
x.pipe(dematerialize(), map(x => x * 2))
Because I will lose my metadata. How do I make a wrap function, that will apply to any operator and will still preserve my additional metadata?
x.pipe(wrap(map(x => x * 2)))
I thought about something like this:
function wrap<T, R>(
operator: OperatorFunction<T, R>
): (source: Observable<TaggedValue<T>>) => Observable<TaggedValue<R>> {
return source =>
source.pipe(
switchMap(x =>
of(x).pipe(
dematerialize(),
operator,
materialize(),
map(z => Object.assign(z, { valueId: x.valueId }))
)
)
);
}
But it generates fake complete messages from of()
.
Sample: https://stackblitz.com/edit/rxjs-fhv54p
rxjs reactivex
rxjs reactivex
edited Dec 27 '18 at 22:21
Łukasz Szcześniak
asked Dec 23 '18 at 20:33
Łukasz SzcześniakŁukasz Szcześniak
573516
573516
add a comment |
add a comment |
3 Answers
3
active
oldest
votes
The issue in your approach using of
is that the complete notifications of of
are materialized and passed as value
to the observer of source
.
Try this:
function wrap<T, R>(op: OperatorFunction<T, R>):
(source: Observable<any>) => Observable<any> {
return source => {
return source.pipe(
switchMap(x => of(x)
.pipe(
dematerialize(),
op,
map(y => ({value: y, uuid: x.uuid}))
)
),
materialize(),
map((x: any) => Object.assign(x, {
value: x.value ? x.value.value : undefined,
uuid: x.value ? x.value.uuid: undefined
})),
)
}
}
Demo: https://stackblitz.com/edit/rxjs-gxj7zl
Yeah, you are right. Although this idea is not working forbuffer
-like operators and it breaks after first non-value notification (complete or error, because they cannot be mapped).
– Łukasz Szcześniak
Jan 5 at 22:40
add a comment |
You can do something like this
const stream$ = stream$.pipe(map((data=> ({...data, x: data.x + 1}))))
Or move it to a wrap function
const mapProp = (propName, fn) => stream$.pipe(map((data=> ({...data, [propName]: fn(data[propName])}))))
//then
const stream$ = stream$.pipe(mapProp('x', x => x+1 ))
if you want to use it for something else but map
const mapProp = (propName, fn) => stream$.pipe(
mergeMap(data =>
of(data[propName])
.pipe(
fn,
map(newPropValue => ({ ...data, propName: newPropValue })
)
)
)
//Usage
const stream$ = stream$.pipe(mapProp('x', map(x => x+1)))
As I've said in question,of()
generates artificial complete messages.
– Łukasz Szcześniak
Dec 24 '18 at 12:28
add a comment |
For now I've came up with an idea that "preserves" data using wrap function state. I don't really like it's 'side effect' nature, although it's safest implementation I've found so far.
export function wrap<T, R>(
operator: OperatorFunction<T, R>,
) => (source: Observable<TaggedValue<T>>) => Observable<TaggedValue<R>> {
return source => {
let metadata: { stepId: number; streamId: number; valueId: number };
return source.pipe(
tap(
({ valueId, stepId, streamId }) =>
(metadata = { valueId, streamId, stepId: stepId + 1 }),
),
dematerialize(),
operator,
materialize(),
map(x => Object.assign(x, metadata, { timestamp: Date.now() })),
);
};
}
But it's not working, because metadata is getting overridden randomly.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53906967%2fwrap-operator-in-rxjs-so-it-can-be-applied-on-materialized-stream%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
3 Answers
3
active
oldest
votes
3 Answers
3
active
oldest
votes
active
oldest
votes
active
oldest
votes
The issue in your approach using of
is that the complete notifications of of
are materialized and passed as value
to the observer of source
.
Try this:
function wrap<T, R>(op: OperatorFunction<T, R>):
(source: Observable<any>) => Observable<any> {
return source => {
return source.pipe(
switchMap(x => of(x)
.pipe(
dematerialize(),
op,
map(y => ({value: y, uuid: x.uuid}))
)
),
materialize(),
map((x: any) => Object.assign(x, {
value: x.value ? x.value.value : undefined,
uuid: x.value ? x.value.uuid: undefined
})),
)
}
}
Demo: https://stackblitz.com/edit/rxjs-gxj7zl
Yeah, you are right. Although this idea is not working forbuffer
-like operators and it breaks after first non-value notification (complete or error, because they cannot be mapped).
– Łukasz Szcześniak
Jan 5 at 22:40
add a comment |
The issue in your approach using of
is that the complete notifications of of
are materialized and passed as value
to the observer of source
.
Try this:
function wrap<T, R>(op: OperatorFunction<T, R>):
(source: Observable<any>) => Observable<any> {
return source => {
return source.pipe(
switchMap(x => of(x)
.pipe(
dematerialize(),
op,
map(y => ({value: y, uuid: x.uuid}))
)
),
materialize(),
map((x: any) => Object.assign(x, {
value: x.value ? x.value.value : undefined,
uuid: x.value ? x.value.uuid: undefined
})),
)
}
}
Demo: https://stackblitz.com/edit/rxjs-gxj7zl
Yeah, you are right. Although this idea is not working forbuffer
-like operators and it breaks after first non-value notification (complete or error, because they cannot be mapped).
– Łukasz Szcześniak
Jan 5 at 22:40
add a comment |
The issue in your approach using of
is that the complete notifications of of
are materialized and passed as value
to the observer of source
.
Try this:
function wrap<T, R>(op: OperatorFunction<T, R>):
(source: Observable<any>) => Observable<any> {
return source => {
return source.pipe(
switchMap(x => of(x)
.pipe(
dematerialize(),
op,
map(y => ({value: y, uuid: x.uuid}))
)
),
materialize(),
map((x: any) => Object.assign(x, {
value: x.value ? x.value.value : undefined,
uuid: x.value ? x.value.uuid: undefined
})),
)
}
}
Demo: https://stackblitz.com/edit/rxjs-gxj7zl
The issue in your approach using of
is that the complete notifications of of
are materialized and passed as value
to the observer of source
.
Try this:
function wrap<T, R>(op: OperatorFunction<T, R>):
(source: Observable<any>) => Observable<any> {
return source => {
return source.pipe(
switchMap(x => of(x)
.pipe(
dematerialize(),
op,
map(y => ({value: y, uuid: x.uuid}))
)
),
materialize(),
map((x: any) => Object.assign(x, {
value: x.value ? x.value.value : undefined,
uuid: x.value ? x.value.uuid: undefined
})),
)
}
}
Demo: https://stackblitz.com/edit/rxjs-gxj7zl
edited Jan 16 at 23:32
answered Jan 3 at 21:23
jaljal
6481513
6481513
Yeah, you are right. Although this idea is not working forbuffer
-like operators and it breaks after first non-value notification (complete or error, because they cannot be mapped).
– Łukasz Szcześniak
Jan 5 at 22:40
add a comment |
Yeah, you are right. Although this idea is not working forbuffer
-like operators and it breaks after first non-value notification (complete or error, because they cannot be mapped).
– Łukasz Szcześniak
Jan 5 at 22:40
Yeah, you are right. Although this idea is not working for
buffer
-like operators and it breaks after first non-value notification (complete or error, because they cannot be mapped).– Łukasz Szcześniak
Jan 5 at 22:40
Yeah, you are right. Although this idea is not working for
buffer
-like operators and it breaks after first non-value notification (complete or error, because they cannot be mapped).– Łukasz Szcześniak
Jan 5 at 22:40
add a comment |
You can do something like this
const stream$ = stream$.pipe(map((data=> ({...data, x: data.x + 1}))))
Or move it to a wrap function
const mapProp = (propName, fn) => stream$.pipe(map((data=> ({...data, [propName]: fn(data[propName])}))))
//then
const stream$ = stream$.pipe(mapProp('x', x => x+1 ))
if you want to use it for something else but map
const mapProp = (propName, fn) => stream$.pipe(
mergeMap(data =>
of(data[propName])
.pipe(
fn,
map(newPropValue => ({ ...data, propName: newPropValue })
)
)
)
//Usage
const stream$ = stream$.pipe(mapProp('x', map(x => x+1)))
As I've said in question,of()
generates artificial complete messages.
– Łukasz Szcześniak
Dec 24 '18 at 12:28
add a comment |
You can do something like this
const stream$ = stream$.pipe(map((data=> ({...data, x: data.x + 1}))))
Or move it to a wrap function
const mapProp = (propName, fn) => stream$.pipe(map((data=> ({...data, [propName]: fn(data[propName])}))))
//then
const stream$ = stream$.pipe(mapProp('x', x => x+1 ))
if you want to use it for something else but map
const mapProp = (propName, fn) => stream$.pipe(
mergeMap(data =>
of(data[propName])
.pipe(
fn,
map(newPropValue => ({ ...data, propName: newPropValue })
)
)
)
//Usage
const stream$ = stream$.pipe(mapProp('x', map(x => x+1)))
As I've said in question,of()
generates artificial complete messages.
– Łukasz Szcześniak
Dec 24 '18 at 12:28
add a comment |
You can do something like this
const stream$ = stream$.pipe(map((data=> ({...data, x: data.x + 1}))))
Or move it to a wrap function
const mapProp = (propName, fn) => stream$.pipe(map((data=> ({...data, [propName]: fn(data[propName])}))))
//then
const stream$ = stream$.pipe(mapProp('x', x => x+1 ))
if you want to use it for something else but map
const mapProp = (propName, fn) => stream$.pipe(
mergeMap(data =>
of(data[propName])
.pipe(
fn,
map(newPropValue => ({ ...data, propName: newPropValue })
)
)
)
//Usage
const stream$ = stream$.pipe(mapProp('x', map(x => x+1)))
You can do something like this
const stream$ = stream$.pipe(map((data=> ({...data, x: data.x + 1}))))
Or move it to a wrap function
const mapProp = (propName, fn) => stream$.pipe(map((data=> ({...data, [propName]: fn(data[propName])}))))
//then
const stream$ = stream$.pipe(mapProp('x', x => x+1 ))
if you want to use it for something else but map
const mapProp = (propName, fn) => stream$.pipe(
mergeMap(data =>
of(data[propName])
.pipe(
fn,
map(newPropValue => ({ ...data, propName: newPropValue })
)
)
)
//Usage
const stream$ = stream$.pipe(mapProp('x', map(x => x+1)))
answered Dec 23 '18 at 21:41
Maksim RomanenkoMaksim Romanenko
31516
31516
As I've said in question,of()
generates artificial complete messages.
– Łukasz Szcześniak
Dec 24 '18 at 12:28
add a comment |
As I've said in question,of()
generates artificial complete messages.
– Łukasz Szcześniak
Dec 24 '18 at 12:28
As I've said in question,
of()
generates artificial complete messages.– Łukasz Szcześniak
Dec 24 '18 at 12:28
As I've said in question,
of()
generates artificial complete messages.– Łukasz Szcześniak
Dec 24 '18 at 12:28
add a comment |
For now I've came up with an idea that "preserves" data using wrap function state. I don't really like it's 'side effect' nature, although it's safest implementation I've found so far.
export function wrap<T, R>(
operator: OperatorFunction<T, R>,
) => (source: Observable<TaggedValue<T>>) => Observable<TaggedValue<R>> {
return source => {
let metadata: { stepId: number; streamId: number; valueId: number };
return source.pipe(
tap(
({ valueId, stepId, streamId }) =>
(metadata = { valueId, streamId, stepId: stepId + 1 }),
),
dematerialize(),
operator,
materialize(),
map(x => Object.assign(x, metadata, { timestamp: Date.now() })),
);
};
}
But it's not working, because metadata is getting overridden randomly.
add a comment |
For now I've came up with an idea that "preserves" data using wrap function state. I don't really like it's 'side effect' nature, although it's safest implementation I've found so far.
export function wrap<T, R>(
operator: OperatorFunction<T, R>,
) => (source: Observable<TaggedValue<T>>) => Observable<TaggedValue<R>> {
return source => {
let metadata: { stepId: number; streamId: number; valueId: number };
return source.pipe(
tap(
({ valueId, stepId, streamId }) =>
(metadata = { valueId, streamId, stepId: stepId + 1 }),
),
dematerialize(),
operator,
materialize(),
map(x => Object.assign(x, metadata, { timestamp: Date.now() })),
);
};
}
But it's not working, because metadata is getting overridden randomly.
add a comment |
For now I've came up with an idea that "preserves" data using wrap function state. I don't really like it's 'side effect' nature, although it's safest implementation I've found so far.
export function wrap<T, R>(
operator: OperatorFunction<T, R>,
) => (source: Observable<TaggedValue<T>>) => Observable<TaggedValue<R>> {
return source => {
let metadata: { stepId: number; streamId: number; valueId: number };
return source.pipe(
tap(
({ valueId, stepId, streamId }) =>
(metadata = { valueId, streamId, stepId: stepId + 1 }),
),
dematerialize(),
operator,
materialize(),
map(x => Object.assign(x, metadata, { timestamp: Date.now() })),
);
};
}
But it's not working, because metadata is getting overridden randomly.
For now I've came up with an idea that "preserves" data using wrap function state. I don't really like it's 'side effect' nature, although it's safest implementation I've found so far.
export function wrap<T, R>(
operator: OperatorFunction<T, R>,
) => (source: Observable<TaggedValue<T>>) => Observable<TaggedValue<R>> {
return source => {
let metadata: { stepId: number; streamId: number; valueId: number };
return source.pipe(
tap(
({ valueId, stepId, streamId }) =>
(metadata = { valueId, streamId, stepId: stepId + 1 }),
),
dematerialize(),
operator,
materialize(),
map(x => Object.assign(x, metadata, { timestamp: Date.now() })),
);
};
}
But it's not working, because metadata is getting overridden randomly.
edited Dec 27 '18 at 21:23
answered Dec 24 '18 at 12:34
Łukasz SzcześniakŁukasz Szcześniak
573516
573516
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53906967%2fwrap-operator-in-rxjs-so-it-can-be-applied-on-materialized-stream%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
dxSiWjXkPtyVQKrBFj,7ZjT JxYgcEVoI2V9qlt,f omsQKihwewmr1