Python 이벤트 브로트캐스팅하는 Blinker 라이브러리 알아보기
오늘은 Python으로 이벤트(신호)를 브로드캐스팅할 수 있는 Blinker를 알아보려 합니다.
Blinker 설치
우선 virtualenv로 파이썬 환경을 분리해줍니다.
pip3 install virtualenv
virtualenv -mvenv env
env라는 이름의 가상 환경을 생성합니다.
source env/bin/activate
가상환경을 폴더에서 활성화합니다.
pip3 install --upgrade pip
pip의 업그레이드가 존재하는지 확인하고 진행합니다.
pip install blinker
pip로 Blinker를 설치합니다.
hello world
from blinker import signal
signal을 가져옵니다.
started = signal('started')
이름있는 신호 객체를 만듭니다.
def round_function(num):
print("Round : %s" % num)
def round_two(num):
print(num)
보낸 신호를 수신할 때에 실행할 두 함수를 만듭니다.
started.connect(round_function)
started.connect(round_two, sender=2)
send 함수로 수신할 수 있는 receiver 함수를 연결합니다.
for i in range(1, 4):
started.send(i)
1부터 4까지 송신합니다.
결과적으로 2가 송수신될 때는 round_two 함수도 수행됩니다.
signal
from blinker import signal
signal을 가져옵니다.
def subscribe_function(sender):
print(sender)
sig = signal('ready')
sig.connect(subscribe_function)
신호를 수신할 때 수행할 함수를 만들고 연결합니다.
class Processor:
def __init__(self, name):
self.name = name
def executor(self):
ready = signal('ready')
ready.send(self)
def __repr__(self):
return '<Processor %s>' % self.name
if __name__ == '__main__':
test = Processor('test')
test.executor()
Processor 객체를 만들어서 executor 함수를 수행하면 이전에 만든 ready 신호를 송신할 수 있습니다.
data signal
from blinker import signal
signal을 가져옵니다.
send_data = signal('send-data')
@send_data.connect
def receive_data(sender, **kw):
print(sender, kw)
값을 수신하여 수행할 함수를 만들 수 있습니다.
result = send_data.send('anonymous', value="value")
send_data라는 신호를 보낼 함수에 값을 넣어서 보낼 수 있습니다.
Written on June 8, 2019