RxPY Observable 변환하는 방법에 대하여 알아보기
오늘은 RxPY에서 Observable을 변환해주는 방법들을 실습해보려 합니다.
RxPY는 Reactive Extensions의 python 라이브러리입니다.
RxPY 설치
pip3 install rx
rx-1.6.x 버전이 설치되며,
현재 최신 버전인 3.0으로 설치하려면
pip3 install -pre rx
pre 옵션을 주어야 합니다.
저 같은 경우에는 python3로 설치하려고 pip3에서 진행했습니다.
RxPY creation
from rx import *
rxpy를 사용하기 위해서 rx 패키지를 가져옵니다.
class ObserverClass(Observer):
def on_next(self, value):
print("print : " + str(value))
def on_completed(self):
print("Done!")
def on_error(self, error):
print("Error : "+str(error))
Observer를 상속 받은 옵저버 클래스를 만들어줍니다.
여기에는 on_next, on_error, on_completed 메소드로 구성됩니다.
이를 이용해서 생성한 Observable을 옵저버 클래스에 구독해줍니다.
Observable.from_((1, 2, 3)).map(lambda x: x * 2).subscribe(ObserverClass())
Observable.from_(range(10, 20, 2)).map(
lambda x, i: "%s / %s" % (i, x * 2)).subscribe(ObserverClass())
가장 기본적으로 변환하려면 map이 있습니다.
람다식을 지나서 Observable이 원하는 형태로 변환됩니다.
Observable.range(1, 2).flat_map(
lambda x: Observable.range(x, 2)).subscribe(ObserverClass())
flat_map은 람다식을 지나서 Observable이 원하는 형태로 변한 후에 다시 Observable로 변환됩니다.
Observable.range(1, 2).flat_map_latest(
lambda x: Observable.range(x, 2)).subscribe(ObserverClass())
flat_map_latest는 새로운 Observable이 생겼을 때에 기존의 Observable은 무시됩니다.
def sel(*arr):
return '-'.join([str(i) for i in arr])
Observable.from_(('a', 'b', 'c')).flat_map(lambda x, i: (
x, i), sel).subscribe(ObserverClass())
flat_map에 selector를 사용할 수도 있습니다.
Observable.from_([{'x': 1, 'y': 2}, {'x': 3, 'y': 4}]
).pluck('y').subscribe(ObserverClass())
기존의 Observable에서 pluck로 원하는 것만 뽑아서 Observable을 변환할 수 있습니다.
class test:
def __init__(self, x, y):
self.x = x
self.y = y
Observable.from_([test(1, 2), test(3, 4)]).pluck_attr(
'x').subscribe(ObserverClass())
pluck_attr으로 속성 값을 추출해서 Observable을 변환할 수 있습니다.
Observable.from_(('a', 'b', 'c')).to_blocking(
).many_select(lambda x: x.first()).merge_all().subscribe(ObserverClass())
many_select는 원본 Observable에서 방출된 각 인덱스로 인하여 순서대로 Observable을 변환할 수 있습니다.
Observable.from_(('1', '2', '3')).to_blocking().scan(lambda x,
y: int(x) + int(y)).subscribe(ObserverClass())
scan으로 람다식에 의해 Observable의 기존의 x에 값을 유지하고 y를 더해줍니다.
Observable.from_(('1', '2', '3')).to_blocking(
).timestamp().pluck_attr('timestamp').subscribe(ObserverClass())
timestamp로 변환된 Observable에서 timestamp만 뽑아서 구독합니다.
Observable.from_(('1', '2', '3')).to_blocking().time_interval().map(
lambda x: x.interval).subscribe(ObserverClass())
time_interval로 연속적으로 Observable의 변환이 일어나는 시간 간격을 구독합니다.