Python rq 패키지 알아보고 rq-scheduler도 사용해보기

오늘은 Python에서 작업을 대기열에 추가하고, 워커로 처리할 수 있는 rq 라이브러리를 알아보려 합니다.

rq 패키지 설치

우선 virtualenv로 파이썬 환경을 분리합니다.

pip3 install virtualenv
virtualenv -mvenv env

env라는 이름의 가상 환경을 생성합니다.

source env/bin/activate

가상환경을 원하는 폴더에서 활성화합니다.

pip3 install --upgrade pip

pip의 업그레이드가 존재하는지 확인하고 진행합니다.

pip install rq

pip로 rq 패키지를 설치합니다.

pip install redis==3.5.3

해당 패키지는 Redis 기반이라서 redis 패키지를 3.0.0 보다 큰 버전으로 설치해야 합니다.

Redis 설정

우선 redis 환경을 구축해두었다면 직접 서버를 열어도 되지만,

redis-server

도커 환경을 이용할 수도 있습니다.

version: '3.2'

services:
  redis:
    image: redis:6.0.9-alpine
    ports:
      - "6379:6379"

rq 패키지를 사용하는 서비스에서 해당 redis 도커 컨테이너를 연결하려면 links로 연결합니다.

from redis import Redis

REDIS_HOST = 'redis'
REDIS_PORT = 6379
REDIS_DB = 0

redis_connection = Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)

redis 환경이 다 구축되었다면 파이썬 코드에서 Redis 객체를 생성합니다.

워커 생성

대기열을 가진 워커를 생성하기 위해서 “rq worker” 명령어를 수행하거나, 도커로 별도 프로세스를 구동할 수도 있습니다.

  rq_worker:
    build:
      context: .
      dockerfile: .Dockerfile
    command: rq worker --url redis://redis:6379 main
    depends_on:
      - redis
    links:
      - redis

Dockerfile에서 중요한 점은 파이썬 환경에 rq 패키지가 설치되어 있어야 한다는 점입니다.

queue = Queue('main', connection=redis_connection)

미리 생성한 redis 커넥션으로 main 워커가 담당하는 대기열을 만들어줍니다.

작업 생성

def run_task():
    # 테스트 함수
    return {}
job = queue.enqueue(run_task, job_id="custom_id")

만약 워커로 위 run_task 함수를 수행하려면 대기열에 보냅니다.

작업 조회

job = Job.fetch(job_id, connection=redis_connection)

기간이 만료되지 않은 작업에 대해서는 작업에 대한 정보를 알 수 있습니다.

볼 수 있는 속성에 대한 자세한 내용은 아래에 정리했습니다.

  • job.id : 작업의 고유 아이디

  • job.get_status() : “finished”와 같은 작업의 상태

  • job.func_name : 작업이 수행하는 함수 이름

  • job.args : 작업이 수행하는 함수 인수

  • job.result : 작업 결과

  • job.exc_info : 작업 오류

  • job.enqueued_at : 대기열에 들어간 시점

  • job.started_at : 시작 시점

  • job.ended_at : 끝 시점

finished_registry = FinishedJobRegistry('main', connection=redis_connection)
started_registry = StartedJobRegistry('main', connection=redis_connection)
scheduled_registry = ScheduledJobRegistry('main', connection=redis_connection)
failed_registry = FailedJobRegistry('main', connection=redis_connection)
deferred_registry = DeferredJobRegistry('main', connection=redis_connection)

만약 작업이 어느 상태에 들어갔는지 궁금하다면, 각 결과에 맞는 JobRegistry로 작업 목록을 가져올 수 있습니다.

finished_registry.get_job_ids()

finished 상태의 작업들을 조회하고자 하면 FinishedJobRegistry로 get_job_ids 함수를 호출합니다.

만약 result_ttl에 대해 별도로 지정하지 않았다면 500초 동안 유지되는 동안 계속 볼 수 있습니다.

cron 표현식

cron 표현식으로 대기열 스케줄을 관리하려면 rq-scheduler 패키지를 추가로 설치해야 합니다.

우선 이전에 만들어둔 워커에 –with-scheduler 옵션을 부여합니다.

rqscheduler --host localhost --port 6379 --db 0

그리고 redis가 localhost:6379로 연결되었다면 위와 같이 rqscheduler 명령어를 이용하거나,

  scheduler:
    build:
      context: .
      dockerfile: .Dockerfile
    command: rqscheduler --host redis --port 6379 --db 0
    depends_on:
      - redis
    links:
      - redis

워커와 마찬가지로 스케줄러도 도커 컨테이너를 생성할 수 있습니다.

scheduler = Scheduler('main', connection=redis_connection)
scheduler.cron("* * * * *", func=run_task, id="custom_id")

스케줄러 객체를 생성하고, cron 표현식으로 작업을 예약할 수 있습니다.

만약 1회성으로 작업 예약하려고 datetime을 사용한다면, cron이 아닌 schedule로 코드를 변경해야 합니다.

jobs = scheduler.get_jobs(with_times=True)

대기열에 넣어진 작업은 get_jobs로 볼 수 있습니다.

scheduler.cancel(job_id)

앞으로 특정 작업을 수행하고 싶지 않다면, 작업을 취소할 수 있습니다.

대시보드

웹으로 대기열을 보는 방식이 있습니다.

pip install rq-dashboard

pip로 rq-dashboard 패키지를 설치하거나,

  rq_dashboard:
    image: eoranged/rq-dashboard:latest
    ports:
      - 9181:9181
    links:
      - redis
    environment:
      - RQ_DASHBOARD_REDIS_URL=redis://redis:6379

이미 만들어진 도커 이미지로 컨테이너를 만들어 다른 도커 컨테이너와 같이 수행할 수 있습니다.

9181 포트에 접근하면, 웹 UI로 작업과 대기열 그리고 워커에 대한 정보를 볼 수 있습니다.

Written on November 1, 2020