본문 바로가기

Server/RabbitMQ

2. Rabbitpy를 이용해 RabbitMQ 다루기

728x90
반응형
RabbitMQ를 사용할 때 참고하기 위해 작성한 글

 

목차

     

    개요

    Python에서는 AMQP protocol에 따른 통신을 수행하기 위해 유명한 라이브러리인 "pika"라는 게 존재하지만 여기서는 "rabbitpy"를 통해 RabbitMQ를 다뤄보기로 하자.

    본 포스팅에서 작성할 내용은 Exchange, Queue, Binding을 어떻게 코드로 작성하는가에 대한 기본적인 내용만 담을 것이다.

     

    rabbitpy, install

    $ pip install rabbitpy

     

    rabbitpy, Connection

    rabbitpy를 이용해 exchange, queue, binding 기타 등등의 다양한 기능을 다루기 전에 rabbitmq에 대한 connection을 열어야 한다. connection 객체에는 RabbitMQ의 URI를 나타내는 정보를 넘겨줘야 한다.

    import rabbitpy
    
    connection = rabbitpy.Connection('amqp://guest:guest@localhost:5672/%2F')
    connection.close()

    Connection 객체 인수로 넘겨준 문자열의 끝에 "%2F"는 "/"가 URL 인코딩을 적용한 값인데 굳이 왜 URL 인코딩까지 적용하여 넘겨야 하는지는 찾지 못했다. Connection 객체에 넘겨준 문자열은 의미론적으로 다음과 같다.

    amqp://[USERID]:[PASSWORD]@[RABBIMQ_HOST]:[RABBITMQ_PORT]/[VHOST]

    python의 context manager를 이용해 connection을 여는 것도 가능하다.

    with rabbitpy.Connection() as conn: ...

     

    rabbitpy, Channel

    connection을 열었다면 다음 단계로  channel을 열어야 한다. 

    import rabbitpy
    
    channel = connection.channel()
    channel.close()

    conneciton과 마찬가지로 channel도 context manager를 사용할 수 있다.

    with rabbitpy.Connection() as connection:
        with connection.channel() as channel:
        	...

     

    rabbitpy, Exchange 생성

    RabbitMQ에 메시지를 발행하려면 Exchange가 필요하다. rabbitpy에서는 다음과 같이 Exchange를 선언한다.

    import rabbitpy
    
    url = 'amqp://guest:guest@localhost:5672/'
    
    with rabbitpy.Connection(url) as connection:
        with connection.channel() as channel:
            exchange = rabbitpy.Exchange(channel, "my-ex")
            exchange.declare()

    exchange를 인스턴스화 시키면 declare() 메서드를 사용할 있다. declare 메서드는 exchange name 존재하지 않으면 RabbitMQ Exchange 생성된다.

     

    rabbitpy, Queue 생성

    Exchange를 선언했으니 Exchange에 연결하여 사용할 Queue도 선언하자. Queue를 선언할 때는 채널과 Queue의 이름을 인자로 전달하면 된다. 아래 예제에서 Queue의 이름은 "my-queue"이다.

    import rabbitpy
    
    url = 'amqp://guest:guest@localhost:5672/'
    
    with rabbitpy.Connection(url) as connection:
        with connection.channel() as channel:
            exchange = rabbitpy.Exchange(channel, "my-ex")
            exchange.declare()
    
            queue = rabbitpy.Queue(channel, "my-queue")
            queue.declare()

    queue도 declare() 메서드를 통해 RabbitMQ에 Queue를 생성한다. queue의 declare 메서드를 print나 특정 변수에 바인딩하여 결과를 찍어보면 해당 큐에 들어있는 메시지 수와 큐에 대한 소비자 수를 튜플로 출력한다.

     

    RabbitMQ에 이미 Queue가 존재할 때 유용하게 써먹도록 하자.

     

     

    rabbitpy, Binding

    Exchange와 Queue를 연결하는 건 앞의 포스팅에서 Binding이라고 적어놨다. rabbitpy에서는 queue에서 bind() 메서드를 통해 이를 수행할 수 있다.

    import rabbitpy
    
    url = 'amqp://guest:guest@localhost:5672/'
    
    with rabbitpy.Connection(url) as connection:
        with connection.channel() as channel:
            exchange = rabbitpy.Exchange(channel, "my-ex")
            exchange.declare()
    
            queue = rabbitpy.Queue(channel, "my-queue")
            queue.declare()
    
            res = queue.bind(exchange, "my-queue-routing-key")
            print(res)
    
    # OutPut
    # True

    나중에 알아볼 Exchange Type에서는 라우팅 키를 기반으로 메시지를 라우팅 할 수 있다. exchange에 queue를 binding 할 때 routing key를 잘 구분하는 것도 중요하다.

     

    rabbitpy, Message Publish

    이제 생성한 Exchange에 message를 보내보자.

    import rabbitpy
    
    url = 'amqp://guest:guest@localhost:5672/'
    
    with rabbitpy.Connection(url) as connection:
        with connection.channel() as channel:
            exchange = rabbitpy.Exchange(channel, "my-ex")
            exchange.declare()
    
            queue = rabbitpy.Queue(channel, "my-queue")
            queue.declare()
    
            queue.bind(exchange, "my-queue-routing-key")
    
            message = rabbitpy.Message(channel, "Test Message")
            message.publish(exchange, "my-queue-routing-key")

    rabbitpy.Message 객체를 이용하여 message를 보낼 수 있다. message를 보낼 때는 여러 옵션을 지정할 수 있다. 메시지 내구성 설정, 메시지 배달 보장 등과 같은 것들인데 나중에 다루기로 하고 지금은 아주 간단한 형태의 메시지 발행만을 보도록 하자.

     

    rabbitpy.Message를 통해 message instance를 얻었으면 publish() 메서드를 이용해 메시지를 보내면 끝이다.

     

     

    rabbitpy, Message Consume

    메시지를 발행했으니 메시지를 소비하는 것도 해보도록 하자.

    import rabbitpy
    
    url = 'amqp://guest:guest@localhost:5672/'
    
    with rabbitpy.Connection(url) as connection:
        with connection.channel() as channel:
            queue = rabbitpy.Queue(channel, "my-queue")
    
            while len(queue) > 0:
                message = queue.get()
                print(message.body)
                message.ack()

    메시지를 가져올 Queue 객체에 대한 인스턴스를 얻게 되면 queue.get()으로 message를 가져올 수 있는데 여기서 주목할만한 점은 queue.get()을 통해 가져온 message가 객체라는 점이다. 아래는 message 인스턴스의 type이다.

    <class 'rabbitpy.message.Message'>

    이 message는 다음과 같은 속성을 가진다. 그때 그 때 찾아서 사용하도록 하자.

    app_id
    content_type
    content_encoding
    correlation_id
    delivery_mode
    expiration
    headers
    message_id
    message_type
    priority
    reply_to
    timestamp
    user_id
    728x90
    반응형

    'Server > RabbitMQ' 카테고리의 다른 글

    3. Delivery Guarantee - Mandatory  (1) 2023.10.07
    1. RabbitMQ  (0) 2023.09.19