Wrap operator in RxJS so it can be applied on materialized stream

Multi tool use
Multi tool use





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}







3















I'm looking for a way to trace value 'route' among stream operators. I have a materialized stream with additional metadata on Notification object (ex. valueId property). It's definition will look something like this:



const x = stream.pipe(
materialize(),
map(x => Object.assign(x, {valueId: randomInt()}))
);


Now I need to wrap operators that are applied to x. Let's say I need to use map(x => x * 2), but I cannot do it like this:



x.pipe(dematerialize(), map(x => x * 2))


Because I will lose my metadata. How do I make a wrap function, that will apply to any operator and will still preserve my additional metadata?



x.pipe(wrap(map(x => x * 2)))


I thought about something like this:



function wrap<T, R>(
operator: OperatorFunction<T, R>
): (source: Observable<TaggedValue<T>>) => Observable<TaggedValue<R>> {
return source =>
source.pipe(
switchMap(x =>
of(x).pipe(
dematerialize(),
operator,
materialize(),
map(z => Object.assign(z, { valueId: x.valueId }))
)
)
);
}


But it generates fake complete messages from of().
Sample: https://stackblitz.com/edit/rxjs-fhv54p










share|improve this question































    3















    I'm looking for a way to trace value 'route' among stream operators. I have a materialized stream with additional metadata on Notification object (ex. valueId property). It's definition will look something like this:



    const x = stream.pipe(
    materialize(),
    map(x => Object.assign(x, {valueId: randomInt()}))
    );


    Now I need to wrap operators that are applied to x. Let's say I need to use map(x => x * 2), but I cannot do it like this:



    x.pipe(dematerialize(), map(x => x * 2))


    Because I will lose my metadata. How do I make a wrap function, that will apply to any operator and will still preserve my additional metadata?



    x.pipe(wrap(map(x => x * 2)))


    I thought about something like this:



    function wrap<T, R>(
    operator: OperatorFunction<T, R>
    ): (source: Observable<TaggedValue<T>>) => Observable<TaggedValue<R>> {
    return source =>
    source.pipe(
    switchMap(x =>
    of(x).pipe(
    dematerialize(),
    operator,
    materialize(),
    map(z => Object.assign(z, { valueId: x.valueId }))
    )
    )
    );
    }


    But it generates fake complete messages from of().
    Sample: https://stackblitz.com/edit/rxjs-fhv54p










    share|improve this question



























      3












      3








      3








      I'm looking for a way to trace value 'route' among stream operators. I have a materialized stream with additional metadata on Notification object (ex. valueId property). It's definition will look something like this:



      const x = stream.pipe(
      materialize(),
      map(x => Object.assign(x, {valueId: randomInt()}))
      );


      Now I need to wrap operators that are applied to x. Let's say I need to use map(x => x * 2), but I cannot do it like this:



      x.pipe(dematerialize(), map(x => x * 2))


      Because I will lose my metadata. How do I make a wrap function, that will apply to any operator and will still preserve my additional metadata?



      x.pipe(wrap(map(x => x * 2)))


      I thought about something like this:



      function wrap<T, R>(
      operator: OperatorFunction<T, R>
      ): (source: Observable<TaggedValue<T>>) => Observable<TaggedValue<R>> {
      return source =>
      source.pipe(
      switchMap(x =>
      of(x).pipe(
      dematerialize(),
      operator,
      materialize(),
      map(z => Object.assign(z, { valueId: x.valueId }))
      )
      )
      );
      }


      But it generates fake complete messages from of().
      Sample: https://stackblitz.com/edit/rxjs-fhv54p










      share|improve this question
















      I'm looking for a way to trace value 'route' among stream operators. I have a materialized stream with additional metadata on Notification object (ex. valueId property). It's definition will look something like this:



      const x = stream.pipe(
      materialize(),
      map(x => Object.assign(x, {valueId: randomInt()}))
      );


      Now I need to wrap operators that are applied to x. Let's say I need to use map(x => x * 2), but I cannot do it like this:



      x.pipe(dematerialize(), map(x => x * 2))


      Because I will lose my metadata. How do I make a wrap function, that will apply to any operator and will still preserve my additional metadata?



      x.pipe(wrap(map(x => x * 2)))


      I thought about something like this:



      function wrap<T, R>(
      operator: OperatorFunction<T, R>
      ): (source: Observable<TaggedValue<T>>) => Observable<TaggedValue<R>> {
      return source =>
      source.pipe(
      switchMap(x =>
      of(x).pipe(
      dematerialize(),
      operator,
      materialize(),
      map(z => Object.assign(z, { valueId: x.valueId }))
      )
      )
      );
      }


      But it generates fake complete messages from of().
      Sample: https://stackblitz.com/edit/rxjs-fhv54p







      rxjs reactivex






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Dec 27 '18 at 22:21







      Łukasz Szcześniak

















      asked Dec 23 '18 at 20:33









      Łukasz SzcześniakŁukasz Szcześniak

      573516




      573516
























          3 Answers
          3






          active

          oldest

          votes


















          1














          The issue in your approach using of is that the complete notifications of of are materialized and passed as value to the observer of source.



          Try this:



          function wrap<T, R>(op: OperatorFunction<T, R>):
          (source: Observable<any>) => Observable<any> {
          return source => {
          return source.pipe(
          switchMap(x => of(x)
          .pipe(
          dematerialize(),
          op,
          map(y => ({value: y, uuid: x.uuid}))
          )
          ),
          materialize(),
          map((x: any) => Object.assign(x, {
          value: x.value ? x.value.value : undefined,
          uuid: x.value ? x.value.uuid: undefined
          })),
          )
          }
          }


          Demo: https://stackblitz.com/edit/rxjs-gxj7zl






          share|improve this answer


























          • Yeah, you are right. Although this idea is not working for buffer-like operators and it breaks after first non-value notification (complete or error, because they cannot be mapped).

            – Łukasz Szcześniak
            Jan 5 at 22:40



















          0














          You can do something like this



          const stream$ = stream$.pipe(map((data=> ({...data, x: data.x + 1}))))


          Or move it to a wrap function



          const mapProp = (propName, fn) => stream$.pipe(map((data=> ({...data, [propName]: fn(data[propName])}))))

          //then
          const stream$ = stream$.pipe(mapProp('x', x => x+1 ))


          if you want to use it for something else but map



          const mapProp = (propName, fn) => stream$.pipe(
          mergeMap(data =>
          of(data[propName])
          .pipe(
          fn,
          map(newPropValue => ({ ...data, propName: newPropValue })
          )
          )
          )

          //Usage
          const stream$ = stream$.pipe(mapProp('x', map(x => x+1)))





          share|improve this answer
























          • As I've said in question, of() generates artificial complete messages.

            – Łukasz Szcześniak
            Dec 24 '18 at 12:28





















          0














          For now I've came up with an idea that "preserves" data using wrap function state. I don't really like it's 'side effect' nature, although it's safest implementation I've found so far.



          export function wrap<T, R>(
          operator: OperatorFunction<T, R>,
          ) => (source: Observable<TaggedValue<T>>) => Observable<TaggedValue<R>> {
          return source => {
          let metadata: { stepId: number; streamId: number; valueId: number };

          return source.pipe(
          tap(
          ({ valueId, stepId, streamId }) =>
          (metadata = { valueId, streamId, stepId: stepId + 1 }),
          ),
          dematerialize(),
          operator,
          materialize(),
          map(x => Object.assign(x, metadata, { timestamp: Date.now() })),
          );
          };
          }


          But it's not working, because metadata is getting overridden randomly.






          share|improve this answer


























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53906967%2fwrap-operator-in-rxjs-so-it-can-be-applied-on-materialized-stream%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            3 Answers
            3






            active

            oldest

            votes








            3 Answers
            3






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            1














            The issue in your approach using of is that the complete notifications of of are materialized and passed as value to the observer of source.



            Try this:



            function wrap<T, R>(op: OperatorFunction<T, R>):
            (source: Observable<any>) => Observable<any> {
            return source => {
            return source.pipe(
            switchMap(x => of(x)
            .pipe(
            dematerialize(),
            op,
            map(y => ({value: y, uuid: x.uuid}))
            )
            ),
            materialize(),
            map((x: any) => Object.assign(x, {
            value: x.value ? x.value.value : undefined,
            uuid: x.value ? x.value.uuid: undefined
            })),
            )
            }
            }


            Demo: https://stackblitz.com/edit/rxjs-gxj7zl






            share|improve this answer


























            • Yeah, you are right. Although this idea is not working for buffer-like operators and it breaks after first non-value notification (complete or error, because they cannot be mapped).

              – Łukasz Szcześniak
              Jan 5 at 22:40
















            1














            The issue in your approach using of is that the complete notifications of of are materialized and passed as value to the observer of source.



            Try this:



            function wrap<T, R>(op: OperatorFunction<T, R>):
            (source: Observable<any>) => Observable<any> {
            return source => {
            return source.pipe(
            switchMap(x => of(x)
            .pipe(
            dematerialize(),
            op,
            map(y => ({value: y, uuid: x.uuid}))
            )
            ),
            materialize(),
            map((x: any) => Object.assign(x, {
            value: x.value ? x.value.value : undefined,
            uuid: x.value ? x.value.uuid: undefined
            })),
            )
            }
            }


            Demo: https://stackblitz.com/edit/rxjs-gxj7zl






            share|improve this answer


























            • Yeah, you are right. Although this idea is not working for buffer-like operators and it breaks after first non-value notification (complete or error, because they cannot be mapped).

              – Łukasz Szcześniak
              Jan 5 at 22:40














            1












            1








            1







            The issue in your approach using of is that the complete notifications of of are materialized and passed as value to the observer of source.



            Try this:



            function wrap<T, R>(op: OperatorFunction<T, R>):
            (source: Observable<any>) => Observable<any> {
            return source => {
            return source.pipe(
            switchMap(x => of(x)
            .pipe(
            dematerialize(),
            op,
            map(y => ({value: y, uuid: x.uuid}))
            )
            ),
            materialize(),
            map((x: any) => Object.assign(x, {
            value: x.value ? x.value.value : undefined,
            uuid: x.value ? x.value.uuid: undefined
            })),
            )
            }
            }


            Demo: https://stackblitz.com/edit/rxjs-gxj7zl






            share|improve this answer















            The issue in your approach using of is that the complete notifications of of are materialized and passed as value to the observer of source.



            Try this:



            function wrap<T, R>(op: OperatorFunction<T, R>):
            (source: Observable<any>) => Observable<any> {
            return source => {
            return source.pipe(
            switchMap(x => of(x)
            .pipe(
            dematerialize(),
            op,
            map(y => ({value: y, uuid: x.uuid}))
            )
            ),
            materialize(),
            map((x: any) => Object.assign(x, {
            value: x.value ? x.value.value : undefined,
            uuid: x.value ? x.value.uuid: undefined
            })),
            )
            }
            }


            Demo: https://stackblitz.com/edit/rxjs-gxj7zl







            share|improve this answer














            share|improve this answer



            share|improve this answer








            edited Jan 16 at 23:32

























            answered Jan 3 at 21:23









            jaljal

            6481513




            6481513













            • Yeah, you are right. Although this idea is not working for buffer-like operators and it breaks after first non-value notification (complete or error, because they cannot be mapped).

              – Łukasz Szcześniak
              Jan 5 at 22:40



















            • Yeah, you are right. Although this idea is not working for buffer-like operators and it breaks after first non-value notification (complete or error, because they cannot be mapped).

              – Łukasz Szcześniak
              Jan 5 at 22:40

















            Yeah, you are right. Although this idea is not working for buffer-like operators and it breaks after first non-value notification (complete or error, because they cannot be mapped).

            – Łukasz Szcześniak
            Jan 5 at 22:40





            Yeah, you are right. Although this idea is not working for buffer-like operators and it breaks after first non-value notification (complete or error, because they cannot be mapped).

            – Łukasz Szcześniak
            Jan 5 at 22:40













            0














            You can do something like this



            const stream$ = stream$.pipe(map((data=> ({...data, x: data.x + 1}))))


            Or move it to a wrap function



            const mapProp = (propName, fn) => stream$.pipe(map((data=> ({...data, [propName]: fn(data[propName])}))))

            //then
            const stream$ = stream$.pipe(mapProp('x', x => x+1 ))


            if you want to use it for something else but map



            const mapProp = (propName, fn) => stream$.pipe(
            mergeMap(data =>
            of(data[propName])
            .pipe(
            fn,
            map(newPropValue => ({ ...data, propName: newPropValue })
            )
            )
            )

            //Usage
            const stream$ = stream$.pipe(mapProp('x', map(x => x+1)))





            share|improve this answer
























            • As I've said in question, of() generates artificial complete messages.

              – Łukasz Szcześniak
              Dec 24 '18 at 12:28


















            0














            You can do something like this



            const stream$ = stream$.pipe(map((data=> ({...data, x: data.x + 1}))))


            Or move it to a wrap function



            const mapProp = (propName, fn) => stream$.pipe(map((data=> ({...data, [propName]: fn(data[propName])}))))

            //then
            const stream$ = stream$.pipe(mapProp('x', x => x+1 ))


            if you want to use it for something else but map



            const mapProp = (propName, fn) => stream$.pipe(
            mergeMap(data =>
            of(data[propName])
            .pipe(
            fn,
            map(newPropValue => ({ ...data, propName: newPropValue })
            )
            )
            )

            //Usage
            const stream$ = stream$.pipe(mapProp('x', map(x => x+1)))





            share|improve this answer
























            • As I've said in question, of() generates artificial complete messages.

              – Łukasz Szcześniak
              Dec 24 '18 at 12:28
















            0












            0








            0







            You can do something like this



            const stream$ = stream$.pipe(map((data=> ({...data, x: data.x + 1}))))


            Or move it to a wrap function



            const mapProp = (propName, fn) => stream$.pipe(map((data=> ({...data, [propName]: fn(data[propName])}))))

            //then
            const stream$ = stream$.pipe(mapProp('x', x => x+1 ))


            if you want to use it for something else but map



            const mapProp = (propName, fn) => stream$.pipe(
            mergeMap(data =>
            of(data[propName])
            .pipe(
            fn,
            map(newPropValue => ({ ...data, propName: newPropValue })
            )
            )
            )

            //Usage
            const stream$ = stream$.pipe(mapProp('x', map(x => x+1)))





            share|improve this answer













            You can do something like this



            const stream$ = stream$.pipe(map((data=> ({...data, x: data.x + 1}))))


            Or move it to a wrap function



            const mapProp = (propName, fn) => stream$.pipe(map((data=> ({...data, [propName]: fn(data[propName])}))))

            //then
            const stream$ = stream$.pipe(mapProp('x', x => x+1 ))


            if you want to use it for something else but map



            const mapProp = (propName, fn) => stream$.pipe(
            mergeMap(data =>
            of(data[propName])
            .pipe(
            fn,
            map(newPropValue => ({ ...data, propName: newPropValue })
            )
            )
            )

            //Usage
            const stream$ = stream$.pipe(mapProp('x', map(x => x+1)))






            share|improve this answer












            share|improve this answer



            share|improve this answer










            answered Dec 23 '18 at 21:41









            Maksim RomanenkoMaksim Romanenko

            31516




            31516













            • As I've said in question, of() generates artificial complete messages.

              – Łukasz Szcześniak
              Dec 24 '18 at 12:28





















            • As I've said in question, of() generates artificial complete messages.

              – Łukasz Szcześniak
              Dec 24 '18 at 12:28



















            As I've said in question, of() generates artificial complete messages.

            – Łukasz Szcześniak
            Dec 24 '18 at 12:28







            As I've said in question, of() generates artificial complete messages.

            – Łukasz Szcześniak
            Dec 24 '18 at 12:28













            0














            For now I've came up with an idea that "preserves" data using wrap function state. I don't really like it's 'side effect' nature, although it's safest implementation I've found so far.



            export function wrap<T, R>(
            operator: OperatorFunction<T, R>,
            ) => (source: Observable<TaggedValue<T>>) => Observable<TaggedValue<R>> {
            return source => {
            let metadata: { stepId: number; streamId: number; valueId: number };

            return source.pipe(
            tap(
            ({ valueId, stepId, streamId }) =>
            (metadata = { valueId, streamId, stepId: stepId + 1 }),
            ),
            dematerialize(),
            operator,
            materialize(),
            map(x => Object.assign(x, metadata, { timestamp: Date.now() })),
            );
            };
            }


            But it's not working, because metadata is getting overridden randomly.






            share|improve this answer






























              0














              For now I've came up with an idea that "preserves" data using wrap function state. I don't really like it's 'side effect' nature, although it's safest implementation I've found so far.



              export function wrap<T, R>(
              operator: OperatorFunction<T, R>,
              ) => (source: Observable<TaggedValue<T>>) => Observable<TaggedValue<R>> {
              return source => {
              let metadata: { stepId: number; streamId: number; valueId: number };

              return source.pipe(
              tap(
              ({ valueId, stepId, streamId }) =>
              (metadata = { valueId, streamId, stepId: stepId + 1 }),
              ),
              dematerialize(),
              operator,
              materialize(),
              map(x => Object.assign(x, metadata, { timestamp: Date.now() })),
              );
              };
              }


              But it's not working, because metadata is getting overridden randomly.






              share|improve this answer




























                0












                0








                0







                For now I've came up with an idea that "preserves" data using wrap function state. I don't really like it's 'side effect' nature, although it's safest implementation I've found so far.



                export function wrap<T, R>(
                operator: OperatorFunction<T, R>,
                ) => (source: Observable<TaggedValue<T>>) => Observable<TaggedValue<R>> {
                return source => {
                let metadata: { stepId: number; streamId: number; valueId: number };

                return source.pipe(
                tap(
                ({ valueId, stepId, streamId }) =>
                (metadata = { valueId, streamId, stepId: stepId + 1 }),
                ),
                dematerialize(),
                operator,
                materialize(),
                map(x => Object.assign(x, metadata, { timestamp: Date.now() })),
                );
                };
                }


                But it's not working, because metadata is getting overridden randomly.






                share|improve this answer















                For now I've came up with an idea that "preserves" data using wrap function state. I don't really like it's 'side effect' nature, although it's safest implementation I've found so far.



                export function wrap<T, R>(
                operator: OperatorFunction<T, R>,
                ) => (source: Observable<TaggedValue<T>>) => Observable<TaggedValue<R>> {
                return source => {
                let metadata: { stepId: number; streamId: number; valueId: number };

                return source.pipe(
                tap(
                ({ valueId, stepId, streamId }) =>
                (metadata = { valueId, streamId, stepId: stepId + 1 }),
                ),
                dematerialize(),
                operator,
                materialize(),
                map(x => Object.assign(x, metadata, { timestamp: Date.now() })),
                );
                };
                }


                But it's not working, because metadata is getting overridden randomly.







                share|improve this answer














                share|improve this answer



                share|improve this answer








                edited Dec 27 '18 at 21:23

























                answered Dec 24 '18 at 12:34









                Łukasz SzcześniakŁukasz Szcześniak

                573516




                573516






























                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53906967%2fwrap-operator-in-rxjs-so-it-can-be-applied-on-materialized-stream%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    dxSiWjXkPtyVQKrBFj,7ZjT JxYgcEVoI2V9qlt,f omsQKihwewmr1
                    X3ELiue8Csqz2,euRF HBnLidM2xMCUA6

                    Popular posts from this blog

                    Monofisismo

                    Angular Downloading a file using contenturl with Basic Authentication

                    Olmecas