개발/안드로이드

RxJava - Create 함수

kwony 2019. 8. 11. 13:36

앞서 작성한 포스트에선 Observable의 역할이 어떤 데이터를 Observer가 처리할 수 있도록 포장해주는 역할 이라고 설명했다. Observable은 데이터를 관찰 할 수 있는 형태로 만들 기 위해 여러 가지 오퍼레이터 함수를 가지고 있다. 이번 포스트에선 이중에서 대표적으로 사용되는 것들만 소개해보려고 한다.


1. create


백그라운드 스레드에서 옵저버가 처리할 넘겨주는 방법. 아래 코드를 보면 create 함수의 인자로 익명 ObservableOnSubscribe 클래스를 선언하고 이 안의 오버라이드 함수 인자인 emitter 변수에 onNext로 0~9까지 값을 넣어 호출 하는 것을 볼 수 있다.


Observable<Integer> observable = Observable
        .create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "subscribe: " + Thread.currentThread().getName());

                for (int i = 0; i < 10; i++) {
                    if (!emitter.isDisposed())
                        emitter.onNext(i);
                }

                if (!emitter.isDisposed())
                    emitter.onComplete();
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());

observable.subscribe(new Observer<Integer>() {
    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "onNext: " + integer);
    }
});


실행결과 아래와 같은 로그가 출력된다. 0~9까지 값이 옵저버에게 전달 됐다. 추가적으로 subscribe에서 돌고 있는 쓰레드 이름을 출력해보니 RxCachedThreadScheduler-1이란 것이 출력됐다. 이는 subscribeOn 함수에 백그라운드 함수중 하나인 Schedulers.io()를 넣었기 때문이다. 이 함수를 사용하면 Observable한 데이터를 만드는 작업을 백그라운드에서 실행 할 수 있게 된다.



2. just 


최대 10개까지의 배열 데이터를 Observable하게 만들 수 있는 함수. 그런데 10개로 제한돼서 배열을 전달하는 경우는 없고 단일 데이터를 전달 할 때 주로 사용된다.


Observable<Integer> observable = Observable
        .just(1,2,3)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());

observable.subscribe(new Observer<Integer>() {
    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "onNext: " + integer);
    }
});


이 함수를 실행해보면 다음과 같은 로그가 나온다.



3. range 


범위를 지정해주는 방법. for문을 생성 함수의 하나로 뒀다고 생각하면 쉽다. 단 range 함수를 사용하면 map같은 형변환 오퍼레이터를 사용하지 않으면 옵저버에선 Integer의 형태로 값을 받게 된다.


Observable<Integer> observable = Observable
        .range(0, 5)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());

observable.subscribe(new Observer<Integer>() {
    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "onNext: " + integer);
    }
});


4. fromIterable 


이미 생성된 배열의 값을 Observable하게 바꿔주는 함수. 앞서 설명한 just는 최대 10개까지 밖에 담지 못한 반면 fromIterable은 개수에 상관 없이 모두 Observable하게 바꿔준다.


List<Integer> integers = new ArrayList<>();
integers.add(1);
integers.add(2);
integers.add(3);

Observable<Integer> observable = Observable
        .fromIterable(integers)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());

observable.subscribe(new Observer<Integer>() {
    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "onNext: " + integer);
    }
});