RxPY Observable 생성하는 방법에 대하여 알아보기

오늘은 RxPY에서 Observable을 생성해주는 방법들을 실습해보려 합니다.

RxPY는 Reactive Extensions의 python 라이브러리입니다.

RxPY 설치

pip3 install rx

rx-1.6.1이 설치되며,

현재 최신 버전인 3.0으로 설치하려면

pip3 install -pre rx

pre 옵션을 주어야 합니다.

저 같은 경우에는 python3로 설치하려고 pip3에서 진행했습니다.

RxPY creation

from rx import *
import random
import time

rx 패키지에서 전부 가져오고, random, time 패키지도 가져옵니다.

class ObserverClass(Observer):

    def on_next(self, value):
        print("print : " + str(value))

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error : "+str(error))

Observer를 상속 받은 옵저버 클래스를 만들어줍니다. 여기에는 on_next, on_error, on_completed 메소드로 구성됩니다.

이를 이용해서 생성한 Observable들을 옵저버 클래스에 구독해줍니다.

Observable.just({'answer': random.randint(0, 10)}).subscribe(ObserverClass())

just는 인자에 들어간 데이터들을 바로 Observable로 변환해줍니다.

Observable.start(func=lambda: random.randint(0, 10)).subscribe(ObserverClass())

start는 함수의 결과를 바로 Observable로 변환해줍니다.

Observable.range(0, 3).subscribe(ObserverClass())

range는 주어진 범위를 iterable한 Observable로 만들어줍니다.

gen = (random.randint(0, 10) for j in range(3))
Observable.from_iterable(gen).subscribe(ObserverClass())

from_iterable은 엘리먼트들을 관찰가능한 Observable로 변환해줍니다.

def g(f, a, b):
    f(a, b)
Observable.from_callback(lambda a, b, f: g(f, a, b))(
    'foo', 'bar').subscribe(ObserverClass())

from_callback도 from_iterable와 다를 것이 없습니다.

Observable.repeat({'rand': time.time()}, 3).subscribe(ObserverClass())

repeat은 반복하여 Observable을 만듭니다.

Observable.generate(0, lambda x: x < 5, lambda x:  x + 1.0,
                    lambda x: x * 2).subscribe(ObserverClass())

generate는 시작할 정수, 조건식, 초기식, 증감식을 사용하요 Observable을 만듭니다.

Observable.defer(lambda: Observable.just(
    random.randint(0, 10))).subscribe(ObserverClass())

defer는 구독하는 시간에 Observable을 만듭니다.

Observable.if_then(
    lambda: True, Observable.return_value(10), Observable.return_value(20)).subscribe(ObserverClass())

if_then은 첫번째 인자가 참일 경우와 거짓일 경우를 결과값으로 나눌 수 있습니다.

Observable.empty().subscribe(ObserverClass())

empty는 출력되는 Observable이 존재하지 않습니다.

Observable.never().subscribe(ObserverClass())

never는 empty와 같지만, 종료되는 시점도 않습니다.

Observable.throw(ZeroDivisionError).subscribe(ObserverClass())

throw는 오류를 발생시킵니다.

Written on March 11, 2019