LiveData + Rx(Hot observable Firebase) problem [Arquitecture question]












0















LiveData Rx Hot observable problem [Arquitecture question]



I have a question over transformation rx Publisher to livedata, I've been programed a project that using firebase rx livedata in a example.



The problem is that when converting a Publisher to LiveData, with LiveDataReactiveStreams library, when the subscription is inactive, the converter clear subscription and when state active create a new subscription.



Code



   /**
* Defines a {@link LiveData} object that wraps a {@link Publisher}.
*
* <p>
* When the LiveData becomes active, it subscribes to the emissions from the Publisher.
*
* <p>
* When the LiveData becomes inactive, the subscription is cleared.
* LiveData holds the last value emitted by the Publisher when the LiveData was active.
* <p>
* Therefore, in the case of a hot RxJava Observable, when a new LiveData {@link Observer} is
* added, it will automatically notify with the last value held in LiveData,
* which might not be the last value emitted by the Publisher.
*
* <p>
* Note that LiveData does NOT handle errors and it expects that errors are treated as states
* in the data that's held. In case of an error being emitted by the publisher, an error will
* be propagated to the main thread and the app will crash.
*
* @param <T> The type of data hold by this instance.
*/
private static class PublisherLiveData<T> extends LiveData<T> {
private final Publisher<T> mPublisher;
final AtomicReference<LiveDataSubscriber> mSubscriber;

PublisherLiveData(@NonNull Publisher<T> publisher) {
mPublisher = publisher;
mSubscriber = new AtomicReference<>();
}

@Override
protected void onActive() { // Problem
super.onActive();
LiveDataSubscriber s = new LiveDataSubscriber();
mSubscriber.set(s);
mPublisher.subscribe(s);
}

@Override
protected void onInactive() { // Problem
super.onInactive();
LiveDataSubscriber s = mSubscriber.getAndSet(null);
if (s != null) {
s.cancelSubscription();
}
}

final class LiveDataSubscriber extends AtomicReference<Subscription>
implements Subscriber<T> {

@Override
public void onSubscribe(Subscription s) {
if (compareAndSet(null, s)) {
s.request(Long.MAX_VALUE);
} else {
s.cancel();
}
}

@Override
public void onNext(T item) {
postValue(item);
}

@Override
public void onError(final Throwable ex) {
mSubscriber.compareAndSet(this, null);

ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
@Override
public void run() {
// Errors should be handled upstream, so propagate as a crash.
throw new RuntimeException("LiveData does not handle errors. Errors from "
+ "publishers should be handled upstream and propagated as "
+ "state", ex);
}
});
}

@Override
public void onComplete() {
mSubscriber.compareAndSet(this, null);
}

public void cancelSubscription() {
Subscription s = get();
if (s != null) {
s.cancel();
}
}
}
}


In this class I add a boolen for decide if that has to subscribe or not



class CustomPublisherLiveData<T> extends LiveData<T> {

private boolean mResubscribe;
private final Publisher<T> mPublisher;
private final AtomicReference<LiveDataSubscriber> mSubscriber;

CustomPublisherLiveData(@NonNull Publisher<T> publisher, boolean resubscribe) {
mPublisher = publisher;
mResubscribe = resubscribe;
mSubscriber = new AtomicReference<>();
}

@Override
protected void onActive() {
super.onActive();
if (mResubscribe){
LiveDataSubscriber s = new LiveDataSubscriber();
mSubscriber.set(s);
mPublisher.subscribe(s);
}
}

@Override
protected void onInactive() {
super.onInactive();
LiveDataSubscriber s = mSubscriber.getAndSet(null);
if (s != null && mResubscribe) {
s.cancelSubscription();
}
}

final class LiveDataSubscriber extends AtomicReference<Subscription>
implements Subscriber<T> {

@Override
public void onSubscribe(Subscription s) {
if (compareAndSet(null, s)) {
s.request(Long.MAX_VALUE);
} else {
s.cancel();
}
}

@Override
public void onNext(T item) {
postValue(item);
}

@Override
public void onError(final Throwable ex) {
mSubscriber.compareAndSet(this, null);

//TODO silence error I have to make the error go up to the main thread
throw new RuntimeException("LiveData does not handle errors. Errors from "
+ "publishers should be handled upstream and propagated as "
+ "state", ex);

ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
@Override
public void run() {
// Errors should be handled upstream, so propagate as a crash.
throw new RuntimeException("LiveData does not handle errors. Errors from "
+ "publishers should be handled upstream and propagated as "
+ "state", ex);
}
});
}

@Override
public void onComplete() {
mSubscriber.compareAndSet(this, null);
}

public void cancelSubscription() {
Subscription s = get();
if (s != null) {
s.cancel();
}
}
}


Is this correct?
I must change the "livedata convert" or I'm missing something



I can a subscribe in a viewModel and set value in a new MutableLiveData but this create a more boilerplate code and i dont know is safe



Regards










share|improve this question



























    0















    LiveData Rx Hot observable problem [Arquitecture question]



    I have a question over transformation rx Publisher to livedata, I've been programed a project that using firebase rx livedata in a example.



    The problem is that when converting a Publisher to LiveData, with LiveDataReactiveStreams library, when the subscription is inactive, the converter clear subscription and when state active create a new subscription.



    Code



       /**
    * Defines a {@link LiveData} object that wraps a {@link Publisher}.
    *
    * <p>
    * When the LiveData becomes active, it subscribes to the emissions from the Publisher.
    *
    * <p>
    * When the LiveData becomes inactive, the subscription is cleared.
    * LiveData holds the last value emitted by the Publisher when the LiveData was active.
    * <p>
    * Therefore, in the case of a hot RxJava Observable, when a new LiveData {@link Observer} is
    * added, it will automatically notify with the last value held in LiveData,
    * which might not be the last value emitted by the Publisher.
    *
    * <p>
    * Note that LiveData does NOT handle errors and it expects that errors are treated as states
    * in the data that's held. In case of an error being emitted by the publisher, an error will
    * be propagated to the main thread and the app will crash.
    *
    * @param <T> The type of data hold by this instance.
    */
    private static class PublisherLiveData<T> extends LiveData<T> {
    private final Publisher<T> mPublisher;
    final AtomicReference<LiveDataSubscriber> mSubscriber;

    PublisherLiveData(@NonNull Publisher<T> publisher) {
    mPublisher = publisher;
    mSubscriber = new AtomicReference<>();
    }

    @Override
    protected void onActive() { // Problem
    super.onActive();
    LiveDataSubscriber s = new LiveDataSubscriber();
    mSubscriber.set(s);
    mPublisher.subscribe(s);
    }

    @Override
    protected void onInactive() { // Problem
    super.onInactive();
    LiveDataSubscriber s = mSubscriber.getAndSet(null);
    if (s != null) {
    s.cancelSubscription();
    }
    }

    final class LiveDataSubscriber extends AtomicReference<Subscription>
    implements Subscriber<T> {

    @Override
    public void onSubscribe(Subscription s) {
    if (compareAndSet(null, s)) {
    s.request(Long.MAX_VALUE);
    } else {
    s.cancel();
    }
    }

    @Override
    public void onNext(T item) {
    postValue(item);
    }

    @Override
    public void onError(final Throwable ex) {
    mSubscriber.compareAndSet(this, null);

    ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
    @Override
    public void run() {
    // Errors should be handled upstream, so propagate as a crash.
    throw new RuntimeException("LiveData does not handle errors. Errors from "
    + "publishers should be handled upstream and propagated as "
    + "state", ex);
    }
    });
    }

    @Override
    public void onComplete() {
    mSubscriber.compareAndSet(this, null);
    }

    public void cancelSubscription() {
    Subscription s = get();
    if (s != null) {
    s.cancel();
    }
    }
    }
    }


    In this class I add a boolen for decide if that has to subscribe or not



    class CustomPublisherLiveData<T> extends LiveData<T> {

    private boolean mResubscribe;
    private final Publisher<T> mPublisher;
    private final AtomicReference<LiveDataSubscriber> mSubscriber;

    CustomPublisherLiveData(@NonNull Publisher<T> publisher, boolean resubscribe) {
    mPublisher = publisher;
    mResubscribe = resubscribe;
    mSubscriber = new AtomicReference<>();
    }

    @Override
    protected void onActive() {
    super.onActive();
    if (mResubscribe){
    LiveDataSubscriber s = new LiveDataSubscriber();
    mSubscriber.set(s);
    mPublisher.subscribe(s);
    }
    }

    @Override
    protected void onInactive() {
    super.onInactive();
    LiveDataSubscriber s = mSubscriber.getAndSet(null);
    if (s != null && mResubscribe) {
    s.cancelSubscription();
    }
    }

    final class LiveDataSubscriber extends AtomicReference<Subscription>
    implements Subscriber<T> {

    @Override
    public void onSubscribe(Subscription s) {
    if (compareAndSet(null, s)) {
    s.request(Long.MAX_VALUE);
    } else {
    s.cancel();
    }
    }

    @Override
    public void onNext(T item) {
    postValue(item);
    }

    @Override
    public void onError(final Throwable ex) {
    mSubscriber.compareAndSet(this, null);

    //TODO silence error I have to make the error go up to the main thread
    throw new RuntimeException("LiveData does not handle errors. Errors from "
    + "publishers should be handled upstream and propagated as "
    + "state", ex);

    ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
    @Override
    public void run() {
    // Errors should be handled upstream, so propagate as a crash.
    throw new RuntimeException("LiveData does not handle errors. Errors from "
    + "publishers should be handled upstream and propagated as "
    + "state", ex);
    }
    });
    }

    @Override
    public void onComplete() {
    mSubscriber.compareAndSet(this, null);
    }

    public void cancelSubscription() {
    Subscription s = get();
    if (s != null) {
    s.cancel();
    }
    }
    }


    Is this correct?
    I must change the "livedata convert" or I'm missing something



    I can a subscribe in a viewModel and set value in a new MutableLiveData but this create a more boilerplate code and i dont know is safe



    Regards










    share|improve this question

























      0












      0








      0








      LiveData Rx Hot observable problem [Arquitecture question]



      I have a question over transformation rx Publisher to livedata, I've been programed a project that using firebase rx livedata in a example.



      The problem is that when converting a Publisher to LiveData, with LiveDataReactiveStreams library, when the subscription is inactive, the converter clear subscription and when state active create a new subscription.



      Code



         /**
      * Defines a {@link LiveData} object that wraps a {@link Publisher}.
      *
      * <p>
      * When the LiveData becomes active, it subscribes to the emissions from the Publisher.
      *
      * <p>
      * When the LiveData becomes inactive, the subscription is cleared.
      * LiveData holds the last value emitted by the Publisher when the LiveData was active.
      * <p>
      * Therefore, in the case of a hot RxJava Observable, when a new LiveData {@link Observer} is
      * added, it will automatically notify with the last value held in LiveData,
      * which might not be the last value emitted by the Publisher.
      *
      * <p>
      * Note that LiveData does NOT handle errors and it expects that errors are treated as states
      * in the data that's held. In case of an error being emitted by the publisher, an error will
      * be propagated to the main thread and the app will crash.
      *
      * @param <T> The type of data hold by this instance.
      */
      private static class PublisherLiveData<T> extends LiveData<T> {
      private final Publisher<T> mPublisher;
      final AtomicReference<LiveDataSubscriber> mSubscriber;

      PublisherLiveData(@NonNull Publisher<T> publisher) {
      mPublisher = publisher;
      mSubscriber = new AtomicReference<>();
      }

      @Override
      protected void onActive() { // Problem
      super.onActive();
      LiveDataSubscriber s = new LiveDataSubscriber();
      mSubscriber.set(s);
      mPublisher.subscribe(s);
      }

      @Override
      protected void onInactive() { // Problem
      super.onInactive();
      LiveDataSubscriber s = mSubscriber.getAndSet(null);
      if (s != null) {
      s.cancelSubscription();
      }
      }

      final class LiveDataSubscriber extends AtomicReference<Subscription>
      implements Subscriber<T> {

      @Override
      public void onSubscribe(Subscription s) {
      if (compareAndSet(null, s)) {
      s.request(Long.MAX_VALUE);
      } else {
      s.cancel();
      }
      }

      @Override
      public void onNext(T item) {
      postValue(item);
      }

      @Override
      public void onError(final Throwable ex) {
      mSubscriber.compareAndSet(this, null);

      ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
      @Override
      public void run() {
      // Errors should be handled upstream, so propagate as a crash.
      throw new RuntimeException("LiveData does not handle errors. Errors from "
      + "publishers should be handled upstream and propagated as "
      + "state", ex);
      }
      });
      }

      @Override
      public void onComplete() {
      mSubscriber.compareAndSet(this, null);
      }

      public void cancelSubscription() {
      Subscription s = get();
      if (s != null) {
      s.cancel();
      }
      }
      }
      }


      In this class I add a boolen for decide if that has to subscribe or not



      class CustomPublisherLiveData<T> extends LiveData<T> {

      private boolean mResubscribe;
      private final Publisher<T> mPublisher;
      private final AtomicReference<LiveDataSubscriber> mSubscriber;

      CustomPublisherLiveData(@NonNull Publisher<T> publisher, boolean resubscribe) {
      mPublisher = publisher;
      mResubscribe = resubscribe;
      mSubscriber = new AtomicReference<>();
      }

      @Override
      protected void onActive() {
      super.onActive();
      if (mResubscribe){
      LiveDataSubscriber s = new LiveDataSubscriber();
      mSubscriber.set(s);
      mPublisher.subscribe(s);
      }
      }

      @Override
      protected void onInactive() {
      super.onInactive();
      LiveDataSubscriber s = mSubscriber.getAndSet(null);
      if (s != null && mResubscribe) {
      s.cancelSubscription();
      }
      }

      final class LiveDataSubscriber extends AtomicReference<Subscription>
      implements Subscriber<T> {

      @Override
      public void onSubscribe(Subscription s) {
      if (compareAndSet(null, s)) {
      s.request(Long.MAX_VALUE);
      } else {
      s.cancel();
      }
      }

      @Override
      public void onNext(T item) {
      postValue(item);
      }

      @Override
      public void onError(final Throwable ex) {
      mSubscriber.compareAndSet(this, null);

      //TODO silence error I have to make the error go up to the main thread
      throw new RuntimeException("LiveData does not handle errors. Errors from "
      + "publishers should be handled upstream and propagated as "
      + "state", ex);

      ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
      @Override
      public void run() {
      // Errors should be handled upstream, so propagate as a crash.
      throw new RuntimeException("LiveData does not handle errors. Errors from "
      + "publishers should be handled upstream and propagated as "
      + "state", ex);
      }
      });
      }

      @Override
      public void onComplete() {
      mSubscriber.compareAndSet(this, null);
      }

      public void cancelSubscription() {
      Subscription s = get();
      if (s != null) {
      s.cancel();
      }
      }
      }


      Is this correct?
      I must change the "livedata convert" or I'm missing something



      I can a subscribe in a viewModel and set value in a new MutableLiveData but this create a more boilerplate code and i dont know is safe



      Regards










      share|improve this question














      LiveData Rx Hot observable problem [Arquitecture question]



      I have a question over transformation rx Publisher to livedata, I've been programed a project that using firebase rx livedata in a example.



      The problem is that when converting a Publisher to LiveData, with LiveDataReactiveStreams library, when the subscription is inactive, the converter clear subscription and when state active create a new subscription.



      Code



         /**
      * Defines a {@link LiveData} object that wraps a {@link Publisher}.
      *
      * <p>
      * When the LiveData becomes active, it subscribes to the emissions from the Publisher.
      *
      * <p>
      * When the LiveData becomes inactive, the subscription is cleared.
      * LiveData holds the last value emitted by the Publisher when the LiveData was active.
      * <p>
      * Therefore, in the case of a hot RxJava Observable, when a new LiveData {@link Observer} is
      * added, it will automatically notify with the last value held in LiveData,
      * which might not be the last value emitted by the Publisher.
      *
      * <p>
      * Note that LiveData does NOT handle errors and it expects that errors are treated as states
      * in the data that's held. In case of an error being emitted by the publisher, an error will
      * be propagated to the main thread and the app will crash.
      *
      * @param <T> The type of data hold by this instance.
      */
      private static class PublisherLiveData<T> extends LiveData<T> {
      private final Publisher<T> mPublisher;
      final AtomicReference<LiveDataSubscriber> mSubscriber;

      PublisherLiveData(@NonNull Publisher<T> publisher) {
      mPublisher = publisher;
      mSubscriber = new AtomicReference<>();
      }

      @Override
      protected void onActive() { // Problem
      super.onActive();
      LiveDataSubscriber s = new LiveDataSubscriber();
      mSubscriber.set(s);
      mPublisher.subscribe(s);
      }

      @Override
      protected void onInactive() { // Problem
      super.onInactive();
      LiveDataSubscriber s = mSubscriber.getAndSet(null);
      if (s != null) {
      s.cancelSubscription();
      }
      }

      final class LiveDataSubscriber extends AtomicReference<Subscription>
      implements Subscriber<T> {

      @Override
      public void onSubscribe(Subscription s) {
      if (compareAndSet(null, s)) {
      s.request(Long.MAX_VALUE);
      } else {
      s.cancel();
      }
      }

      @Override
      public void onNext(T item) {
      postValue(item);
      }

      @Override
      public void onError(final Throwable ex) {
      mSubscriber.compareAndSet(this, null);

      ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
      @Override
      public void run() {
      // Errors should be handled upstream, so propagate as a crash.
      throw new RuntimeException("LiveData does not handle errors. Errors from "
      + "publishers should be handled upstream and propagated as "
      + "state", ex);
      }
      });
      }

      @Override
      public void onComplete() {
      mSubscriber.compareAndSet(this, null);
      }

      public void cancelSubscription() {
      Subscription s = get();
      if (s != null) {
      s.cancel();
      }
      }
      }
      }


      In this class I add a boolen for decide if that has to subscribe or not



      class CustomPublisherLiveData<T> extends LiveData<T> {

      private boolean mResubscribe;
      private final Publisher<T> mPublisher;
      private final AtomicReference<LiveDataSubscriber> mSubscriber;

      CustomPublisherLiveData(@NonNull Publisher<T> publisher, boolean resubscribe) {
      mPublisher = publisher;
      mResubscribe = resubscribe;
      mSubscriber = new AtomicReference<>();
      }

      @Override
      protected void onActive() {
      super.onActive();
      if (mResubscribe){
      LiveDataSubscriber s = new LiveDataSubscriber();
      mSubscriber.set(s);
      mPublisher.subscribe(s);
      }
      }

      @Override
      protected void onInactive() {
      super.onInactive();
      LiveDataSubscriber s = mSubscriber.getAndSet(null);
      if (s != null && mResubscribe) {
      s.cancelSubscription();
      }
      }

      final class LiveDataSubscriber extends AtomicReference<Subscription>
      implements Subscriber<T> {

      @Override
      public void onSubscribe(Subscription s) {
      if (compareAndSet(null, s)) {
      s.request(Long.MAX_VALUE);
      } else {
      s.cancel();
      }
      }

      @Override
      public void onNext(T item) {
      postValue(item);
      }

      @Override
      public void onError(final Throwable ex) {
      mSubscriber.compareAndSet(this, null);

      //TODO silence error I have to make the error go up to the main thread
      throw new RuntimeException("LiveData does not handle errors. Errors from "
      + "publishers should be handled upstream and propagated as "
      + "state", ex);

      ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
      @Override
      public void run() {
      // Errors should be handled upstream, so propagate as a crash.
      throw new RuntimeException("LiveData does not handle errors. Errors from "
      + "publishers should be handled upstream and propagated as "
      + "state", ex);
      }
      });
      }

      @Override
      public void onComplete() {
      mSubscriber.compareAndSet(this, null);
      }

      public void cancelSubscription() {
      Subscription s = get();
      if (s != null) {
      s.cancel();
      }
      }
      }


      Is this correct?
      I must change the "livedata convert" or I'm missing something



      I can a subscribe in a viewModel and set value in a new MutableLiveData but this create a more boilerplate code and i dont know is safe



      Regards







      android rx-java2 rx-kotlin2 livedata






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Dec 28 '18 at 13:01









      Cristian M.GCristian M.G

      212




      212
























          0






          active

          oldest

          votes











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53959027%2flivedata-rxhot-observable-firebase-problem-arquitecture-question%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          0






          active

          oldest

          votes








          0






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes
















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53959027%2flivedata-rxhot-observable-firebase-problem-arquitecture-question%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Monofisismo

          Angular Downloading a file using contenturl with Basic Authentication

          Olmecas