LiveData + Rx(Hot observable Firebase) problem [Arquitecture question]
LiveData Rx Hot observable problem [Arquitecture question]
I have a question over transformation rx Publisher to livedata, I've been programed a project that using firebase rx livedata in a example.
The problem is that when converting a Publisher to LiveData, with LiveDataReactiveStreams library, when the subscription is inactive, the converter clear subscription and when state active create a new subscription.
Code
/**
* Defines a {@link LiveData} object that wraps a {@link Publisher}.
*
* <p>
* When the LiveData becomes active, it subscribes to the emissions from the Publisher.
*
* <p>
* When the LiveData becomes inactive, the subscription is cleared.
* LiveData holds the last value emitted by the Publisher when the LiveData was active.
* <p>
* Therefore, in the case of a hot RxJava Observable, when a new LiveData {@link Observer} is
* added, it will automatically notify with the last value held in LiveData,
* which might not be the last value emitted by the Publisher.
*
* <p>
* Note that LiveData does NOT handle errors and it expects that errors are treated as states
* in the data that's held. In case of an error being emitted by the publisher, an error will
* be propagated to the main thread and the app will crash.
*
* @param <T> The type of data hold by this instance.
*/
private static class PublisherLiveData<T> extends LiveData<T> {
private final Publisher<T> mPublisher;
final AtomicReference<LiveDataSubscriber> mSubscriber;
PublisherLiveData(@NonNull Publisher<T> publisher) {
mPublisher = publisher;
mSubscriber = new AtomicReference<>();
}
@Override
protected void onActive() { // Problem
super.onActive();
LiveDataSubscriber s = new LiveDataSubscriber();
mSubscriber.set(s);
mPublisher.subscribe(s);
}
@Override
protected void onInactive() { // Problem
super.onInactive();
LiveDataSubscriber s = mSubscriber.getAndSet(null);
if (s != null) {
s.cancelSubscription();
}
}
final class LiveDataSubscriber extends AtomicReference<Subscription>
implements Subscriber<T> {
@Override
public void onSubscribe(Subscription s) {
if (compareAndSet(null, s)) {
s.request(Long.MAX_VALUE);
} else {
s.cancel();
}
}
@Override
public void onNext(T item) {
postValue(item);
}
@Override
public void onError(final Throwable ex) {
mSubscriber.compareAndSet(this, null);
ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
@Override
public void run() {
// Errors should be handled upstream, so propagate as a crash.
throw new RuntimeException("LiveData does not handle errors. Errors from "
+ "publishers should be handled upstream and propagated as "
+ "state", ex);
}
});
}
@Override
public void onComplete() {
mSubscriber.compareAndSet(this, null);
}
public void cancelSubscription() {
Subscription s = get();
if (s != null) {
s.cancel();
}
}
}
}
In this class I add a boolen for decide if that has to subscribe or not
class CustomPublisherLiveData<T> extends LiveData<T> {
private boolean mResubscribe;
private final Publisher<T> mPublisher;
private final AtomicReference<LiveDataSubscriber> mSubscriber;
CustomPublisherLiveData(@NonNull Publisher<T> publisher, boolean resubscribe) {
mPublisher = publisher;
mResubscribe = resubscribe;
mSubscriber = new AtomicReference<>();
}
@Override
protected void onActive() {
super.onActive();
if (mResubscribe){
LiveDataSubscriber s = new LiveDataSubscriber();
mSubscriber.set(s);
mPublisher.subscribe(s);
}
}
@Override
protected void onInactive() {
super.onInactive();
LiveDataSubscriber s = mSubscriber.getAndSet(null);
if (s != null && mResubscribe) {
s.cancelSubscription();
}
}
final class LiveDataSubscriber extends AtomicReference<Subscription>
implements Subscriber<T> {
@Override
public void onSubscribe(Subscription s) {
if (compareAndSet(null, s)) {
s.request(Long.MAX_VALUE);
} else {
s.cancel();
}
}
@Override
public void onNext(T item) {
postValue(item);
}
@Override
public void onError(final Throwable ex) {
mSubscriber.compareAndSet(this, null);
//TODO silence error I have to make the error go up to the main thread
throw new RuntimeException("LiveData does not handle errors. Errors from "
+ "publishers should be handled upstream and propagated as "
+ "state", ex);
ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
@Override
public void run() {
// Errors should be handled upstream, so propagate as a crash.
throw new RuntimeException("LiveData does not handle errors. Errors from "
+ "publishers should be handled upstream and propagated as "
+ "state", ex);
}
});
}
@Override
public void onComplete() {
mSubscriber.compareAndSet(this, null);
}
public void cancelSubscription() {
Subscription s = get();
if (s != null) {
s.cancel();
}
}
}
Is this correct?
I must change the "livedata convert" or I'm missing something
I can a subscribe in a viewModel and set value in a new MutableLiveData but this create a more boilerplate code and i dont know is safe
Regards
android rx-java2 rx-kotlin2 livedata
add a comment |
LiveData Rx Hot observable problem [Arquitecture question]
I have a question over transformation rx Publisher to livedata, I've been programed a project that using firebase rx livedata in a example.
The problem is that when converting a Publisher to LiveData, with LiveDataReactiveStreams library, when the subscription is inactive, the converter clear subscription and when state active create a new subscription.
Code
/**
* Defines a {@link LiveData} object that wraps a {@link Publisher}.
*
* <p>
* When the LiveData becomes active, it subscribes to the emissions from the Publisher.
*
* <p>
* When the LiveData becomes inactive, the subscription is cleared.
* LiveData holds the last value emitted by the Publisher when the LiveData was active.
* <p>
* Therefore, in the case of a hot RxJava Observable, when a new LiveData {@link Observer} is
* added, it will automatically notify with the last value held in LiveData,
* which might not be the last value emitted by the Publisher.
*
* <p>
* Note that LiveData does NOT handle errors and it expects that errors are treated as states
* in the data that's held. In case of an error being emitted by the publisher, an error will
* be propagated to the main thread and the app will crash.
*
* @param <T> The type of data hold by this instance.
*/
private static class PublisherLiveData<T> extends LiveData<T> {
private final Publisher<T> mPublisher;
final AtomicReference<LiveDataSubscriber> mSubscriber;
PublisherLiveData(@NonNull Publisher<T> publisher) {
mPublisher = publisher;
mSubscriber = new AtomicReference<>();
}
@Override
protected void onActive() { // Problem
super.onActive();
LiveDataSubscriber s = new LiveDataSubscriber();
mSubscriber.set(s);
mPublisher.subscribe(s);
}
@Override
protected void onInactive() { // Problem
super.onInactive();
LiveDataSubscriber s = mSubscriber.getAndSet(null);
if (s != null) {
s.cancelSubscription();
}
}
final class LiveDataSubscriber extends AtomicReference<Subscription>
implements Subscriber<T> {
@Override
public void onSubscribe(Subscription s) {
if (compareAndSet(null, s)) {
s.request(Long.MAX_VALUE);
} else {
s.cancel();
}
}
@Override
public void onNext(T item) {
postValue(item);
}
@Override
public void onError(final Throwable ex) {
mSubscriber.compareAndSet(this, null);
ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
@Override
public void run() {
// Errors should be handled upstream, so propagate as a crash.
throw new RuntimeException("LiveData does not handle errors. Errors from "
+ "publishers should be handled upstream and propagated as "
+ "state", ex);
}
});
}
@Override
public void onComplete() {
mSubscriber.compareAndSet(this, null);
}
public void cancelSubscription() {
Subscription s = get();
if (s != null) {
s.cancel();
}
}
}
}
In this class I add a boolen for decide if that has to subscribe or not
class CustomPublisherLiveData<T> extends LiveData<T> {
private boolean mResubscribe;
private final Publisher<T> mPublisher;
private final AtomicReference<LiveDataSubscriber> mSubscriber;
CustomPublisherLiveData(@NonNull Publisher<T> publisher, boolean resubscribe) {
mPublisher = publisher;
mResubscribe = resubscribe;
mSubscriber = new AtomicReference<>();
}
@Override
protected void onActive() {
super.onActive();
if (mResubscribe){
LiveDataSubscriber s = new LiveDataSubscriber();
mSubscriber.set(s);
mPublisher.subscribe(s);
}
}
@Override
protected void onInactive() {
super.onInactive();
LiveDataSubscriber s = mSubscriber.getAndSet(null);
if (s != null && mResubscribe) {
s.cancelSubscription();
}
}
final class LiveDataSubscriber extends AtomicReference<Subscription>
implements Subscriber<T> {
@Override
public void onSubscribe(Subscription s) {
if (compareAndSet(null, s)) {
s.request(Long.MAX_VALUE);
} else {
s.cancel();
}
}
@Override
public void onNext(T item) {
postValue(item);
}
@Override
public void onError(final Throwable ex) {
mSubscriber.compareAndSet(this, null);
//TODO silence error I have to make the error go up to the main thread
throw new RuntimeException("LiveData does not handle errors. Errors from "
+ "publishers should be handled upstream and propagated as "
+ "state", ex);
ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
@Override
public void run() {
// Errors should be handled upstream, so propagate as a crash.
throw new RuntimeException("LiveData does not handle errors. Errors from "
+ "publishers should be handled upstream and propagated as "
+ "state", ex);
}
});
}
@Override
public void onComplete() {
mSubscriber.compareAndSet(this, null);
}
public void cancelSubscription() {
Subscription s = get();
if (s != null) {
s.cancel();
}
}
}
Is this correct?
I must change the "livedata convert" or I'm missing something
I can a subscribe in a viewModel and set value in a new MutableLiveData but this create a more boilerplate code and i dont know is safe
Regards
android rx-java2 rx-kotlin2 livedata
add a comment |
LiveData Rx Hot observable problem [Arquitecture question]
I have a question over transformation rx Publisher to livedata, I've been programed a project that using firebase rx livedata in a example.
The problem is that when converting a Publisher to LiveData, with LiveDataReactiveStreams library, when the subscription is inactive, the converter clear subscription and when state active create a new subscription.
Code
/**
* Defines a {@link LiveData} object that wraps a {@link Publisher}.
*
* <p>
* When the LiveData becomes active, it subscribes to the emissions from the Publisher.
*
* <p>
* When the LiveData becomes inactive, the subscription is cleared.
* LiveData holds the last value emitted by the Publisher when the LiveData was active.
* <p>
* Therefore, in the case of a hot RxJava Observable, when a new LiveData {@link Observer} is
* added, it will automatically notify with the last value held in LiveData,
* which might not be the last value emitted by the Publisher.
*
* <p>
* Note that LiveData does NOT handle errors and it expects that errors are treated as states
* in the data that's held. In case of an error being emitted by the publisher, an error will
* be propagated to the main thread and the app will crash.
*
* @param <T> The type of data hold by this instance.
*/
private static class PublisherLiveData<T> extends LiveData<T> {
private final Publisher<T> mPublisher;
final AtomicReference<LiveDataSubscriber> mSubscriber;
PublisherLiveData(@NonNull Publisher<T> publisher) {
mPublisher = publisher;
mSubscriber = new AtomicReference<>();
}
@Override
protected void onActive() { // Problem
super.onActive();
LiveDataSubscriber s = new LiveDataSubscriber();
mSubscriber.set(s);
mPublisher.subscribe(s);
}
@Override
protected void onInactive() { // Problem
super.onInactive();
LiveDataSubscriber s = mSubscriber.getAndSet(null);
if (s != null) {
s.cancelSubscription();
}
}
final class LiveDataSubscriber extends AtomicReference<Subscription>
implements Subscriber<T> {
@Override
public void onSubscribe(Subscription s) {
if (compareAndSet(null, s)) {
s.request(Long.MAX_VALUE);
} else {
s.cancel();
}
}
@Override
public void onNext(T item) {
postValue(item);
}
@Override
public void onError(final Throwable ex) {
mSubscriber.compareAndSet(this, null);
ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
@Override
public void run() {
// Errors should be handled upstream, so propagate as a crash.
throw new RuntimeException("LiveData does not handle errors. Errors from "
+ "publishers should be handled upstream and propagated as "
+ "state", ex);
}
});
}
@Override
public void onComplete() {
mSubscriber.compareAndSet(this, null);
}
public void cancelSubscription() {
Subscription s = get();
if (s != null) {
s.cancel();
}
}
}
}
In this class I add a boolen for decide if that has to subscribe or not
class CustomPublisherLiveData<T> extends LiveData<T> {
private boolean mResubscribe;
private final Publisher<T> mPublisher;
private final AtomicReference<LiveDataSubscriber> mSubscriber;
CustomPublisherLiveData(@NonNull Publisher<T> publisher, boolean resubscribe) {
mPublisher = publisher;
mResubscribe = resubscribe;
mSubscriber = new AtomicReference<>();
}
@Override
protected void onActive() {
super.onActive();
if (mResubscribe){
LiveDataSubscriber s = new LiveDataSubscriber();
mSubscriber.set(s);
mPublisher.subscribe(s);
}
}
@Override
protected void onInactive() {
super.onInactive();
LiveDataSubscriber s = mSubscriber.getAndSet(null);
if (s != null && mResubscribe) {
s.cancelSubscription();
}
}
final class LiveDataSubscriber extends AtomicReference<Subscription>
implements Subscriber<T> {
@Override
public void onSubscribe(Subscription s) {
if (compareAndSet(null, s)) {
s.request(Long.MAX_VALUE);
} else {
s.cancel();
}
}
@Override
public void onNext(T item) {
postValue(item);
}
@Override
public void onError(final Throwable ex) {
mSubscriber.compareAndSet(this, null);
//TODO silence error I have to make the error go up to the main thread
throw new RuntimeException("LiveData does not handle errors. Errors from "
+ "publishers should be handled upstream and propagated as "
+ "state", ex);
ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
@Override
public void run() {
// Errors should be handled upstream, so propagate as a crash.
throw new RuntimeException("LiveData does not handle errors. Errors from "
+ "publishers should be handled upstream and propagated as "
+ "state", ex);
}
});
}
@Override
public void onComplete() {
mSubscriber.compareAndSet(this, null);
}
public void cancelSubscription() {
Subscription s = get();
if (s != null) {
s.cancel();
}
}
}
Is this correct?
I must change the "livedata convert" or I'm missing something
I can a subscribe in a viewModel and set value in a new MutableLiveData but this create a more boilerplate code and i dont know is safe
Regards
android rx-java2 rx-kotlin2 livedata
LiveData Rx Hot observable problem [Arquitecture question]
I have a question over transformation rx Publisher to livedata, I've been programed a project that using firebase rx livedata in a example.
The problem is that when converting a Publisher to LiveData, with LiveDataReactiveStreams library, when the subscription is inactive, the converter clear subscription and when state active create a new subscription.
Code
/**
* Defines a {@link LiveData} object that wraps a {@link Publisher}.
*
* <p>
* When the LiveData becomes active, it subscribes to the emissions from the Publisher.
*
* <p>
* When the LiveData becomes inactive, the subscription is cleared.
* LiveData holds the last value emitted by the Publisher when the LiveData was active.
* <p>
* Therefore, in the case of a hot RxJava Observable, when a new LiveData {@link Observer} is
* added, it will automatically notify with the last value held in LiveData,
* which might not be the last value emitted by the Publisher.
*
* <p>
* Note that LiveData does NOT handle errors and it expects that errors are treated as states
* in the data that's held. In case of an error being emitted by the publisher, an error will
* be propagated to the main thread and the app will crash.
*
* @param <T> The type of data hold by this instance.
*/
private static class PublisherLiveData<T> extends LiveData<T> {
private final Publisher<T> mPublisher;
final AtomicReference<LiveDataSubscriber> mSubscriber;
PublisherLiveData(@NonNull Publisher<T> publisher) {
mPublisher = publisher;
mSubscriber = new AtomicReference<>();
}
@Override
protected void onActive() { // Problem
super.onActive();
LiveDataSubscriber s = new LiveDataSubscriber();
mSubscriber.set(s);
mPublisher.subscribe(s);
}
@Override
protected void onInactive() { // Problem
super.onInactive();
LiveDataSubscriber s = mSubscriber.getAndSet(null);
if (s != null) {
s.cancelSubscription();
}
}
final class LiveDataSubscriber extends AtomicReference<Subscription>
implements Subscriber<T> {
@Override
public void onSubscribe(Subscription s) {
if (compareAndSet(null, s)) {
s.request(Long.MAX_VALUE);
} else {
s.cancel();
}
}
@Override
public void onNext(T item) {
postValue(item);
}
@Override
public void onError(final Throwable ex) {
mSubscriber.compareAndSet(this, null);
ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
@Override
public void run() {
// Errors should be handled upstream, so propagate as a crash.
throw new RuntimeException("LiveData does not handle errors. Errors from "
+ "publishers should be handled upstream and propagated as "
+ "state", ex);
}
});
}
@Override
public void onComplete() {
mSubscriber.compareAndSet(this, null);
}
public void cancelSubscription() {
Subscription s = get();
if (s != null) {
s.cancel();
}
}
}
}
In this class I add a boolen for decide if that has to subscribe or not
class CustomPublisherLiveData<T> extends LiveData<T> {
private boolean mResubscribe;
private final Publisher<T> mPublisher;
private final AtomicReference<LiveDataSubscriber> mSubscriber;
CustomPublisherLiveData(@NonNull Publisher<T> publisher, boolean resubscribe) {
mPublisher = publisher;
mResubscribe = resubscribe;
mSubscriber = new AtomicReference<>();
}
@Override
protected void onActive() {
super.onActive();
if (mResubscribe){
LiveDataSubscriber s = new LiveDataSubscriber();
mSubscriber.set(s);
mPublisher.subscribe(s);
}
}
@Override
protected void onInactive() {
super.onInactive();
LiveDataSubscriber s = mSubscriber.getAndSet(null);
if (s != null && mResubscribe) {
s.cancelSubscription();
}
}
final class LiveDataSubscriber extends AtomicReference<Subscription>
implements Subscriber<T> {
@Override
public void onSubscribe(Subscription s) {
if (compareAndSet(null, s)) {
s.request(Long.MAX_VALUE);
} else {
s.cancel();
}
}
@Override
public void onNext(T item) {
postValue(item);
}
@Override
public void onError(final Throwable ex) {
mSubscriber.compareAndSet(this, null);
//TODO silence error I have to make the error go up to the main thread
throw new RuntimeException("LiveData does not handle errors. Errors from "
+ "publishers should be handled upstream and propagated as "
+ "state", ex);
ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
@Override
public void run() {
// Errors should be handled upstream, so propagate as a crash.
throw new RuntimeException("LiveData does not handle errors. Errors from "
+ "publishers should be handled upstream and propagated as "
+ "state", ex);
}
});
}
@Override
public void onComplete() {
mSubscriber.compareAndSet(this, null);
}
public void cancelSubscription() {
Subscription s = get();
if (s != null) {
s.cancel();
}
}
}
Is this correct?
I must change the "livedata convert" or I'm missing something
I can a subscribe in a viewModel and set value in a new MutableLiveData but this create a more boilerplate code and i dont know is safe
Regards
android rx-java2 rx-kotlin2 livedata
android rx-java2 rx-kotlin2 livedata
asked Dec 28 '18 at 13:01
Cristian M.GCristian M.G
212
212
add a comment |
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53959027%2flivedata-rxhot-observable-firebase-problem-arquitecture-question%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53959027%2flivedata-rxhot-observable-firebase-problem-arquitecture-question%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown