Managing thread schedulers when concat observables

android rx-java

247 просмотра

2 ответа

I have a code like this:

service.getUserById(10)
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .concatMap(getFullUserFromDto())
    .subscribe(doSomehingWithUser());

 private Func1<UserDto, Observable<User>> getFullUserFromDto() {
     return new Func1<UserDto, Observable<User>>() {
         @Override
         public Observable<User> call(final UserDto dto) {
             return dao.getUserById(dto.getUserId());
         }
     };
 }

and in my DAO, I have:

 public Observable<User> getUserById(final Long id) {
        return api.getUserById(id).map(//more things...
 }

Note there are two levels of "concatenation": service -> dao -> api. Method api.getUserById(id) make a network call.

I'm getting NetworkOnMainThreadException error. Why? I'm using and subscribeOn and observeOn operators, but it seems that it is not applied to the "final" built Observable.

If I use this operators in the API call, in the DAO, it works:

return api.getUserById(id)
     .subscribeOn(Schedulers.newThread())
     .observeOn(AndroidSchedulers.mainThread())
     .map(//more things...

Is there a way to use just once in the "root" Observable?

Автор: Héctor Источник Размещён: 08.11.2019 11:14

Ответы (2)


1 плюс

Решение

So, concatMap subscribes on Observables. What thread is used to perform this operation? Well, the thread that called onNext for the concatMat, given that it doesn't change threads/schedulers. So, one simple transposition should help with this:

service.getUserById(10)
  .subscribeOn(Schedulers.newThread())
  .concatMap(getFullUserFromDto())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(doSomehingWithUser());

I'd also suggest to use Schedulers.io(), as it will re-use threads.

Автор: Tassos Bassoukos Размещён: 20.08.2016 01:37

0 плюса

Short answer: use observeOn before chained operations to controll on which schedulers they are executed:

service.getUserById(10)
  .subscribeOn(Schedulers.newThread())
  .observeOn(Schedulers.io())
  .concatMap(getFullUserFromDto())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(doSomehingWithUser());

In the example above, .concatMap will be executed in Schedulers.io()

More details can be found here: http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html

Автор: Entea Размещён: 20.08.2016 11:32
Вопросы из категории :
32x32