MQTT, Message Queuing Telemetry Trasnport
MQTT?
MQTT는 Message Queuing Telemetry Transport의 약자이며 Pub/Sub 모델 기반의 메시지 전송 프로토콜이다. M2M(Machine to Machine) 혹은 IoT(Internet of Things)에 적합하다 (경량이며 전력 소모량이 낮음)
MQTT, Pub/Sub 모델
MQTT Pub/Sub 모델은 발행-구독 모델이며 발행자가 Message를 수신할 곳을 특정하지 않으며 Message를 수신 받을 자가 누구인지 몰라도 된다. 수신자가 특정되지 않았는데 수신자는 어떻게 Message를 전달받을 수 있을까?
발행자는 메시지를 보낼 때 Broker에게 Message를 전달하며 이 때 Message에 Topic을 담아 전달하게 된다. 그리고 수신자는 Broker를 통해 특정 Topic을 구독하고 Topic을 통해 Message를 전달받는다.
수신자는 Broker의 특정 Topic을 보고 있는 것이기 때문에 수신이라기보다는 구독이라는 개념을 사용한다.
정리하자면 발행자와 구독자는 느슨하게 결합되어있는 형태이며 관심사는 Topic에 있으므로 서로 동작 방식을 몰라도 메시지를 주고받을 수 있다는 것이다.
MQTT, Broker
클라이언트-서버 모델에서 서버는 데이터를 들고 있다 클라이언트의 요청 시점에 데이터를 주는데 이와 비슷한 역할을 하는 것이 Broker이다.
때문에 Server를 Broker로 생각할 수 있는데 유의해야 될 것은 Broker는 Client 간의 데이터를 단지 중계해주는 역할만 한다는 점이다.
Message Format
MQTT의 메시지에는 Header, Topic, Payload가 존재한다. Topic의 최대 크기는 64kb이며 Payload의 최대 크기는 256mb이다.
Topic은 슬래시(/)를 이용해 계층적으로 구성할 수 있으며 한 마디로 root/sub1/sub2와 같은 방식으로 사용할 수 있다.
RabbitMQ로 MQTT 셋팅
MQTT의 Broker로 RabbitMQ를 이용해 보고자 한다. Docker를 이용해 RabbitMQ를 이용할 것이며 세팅 방식은 다음과 같다
# 5672: AMQP
# 15672: Management
# 1883: MQTT
# 설치 & 컨테이너 실행
sudo docker run -it -p 5672:5672 -p 15672:15672 -p 1883:1883 --name rabbitmq rabbitmq:3-management
sudo docker exec -it rabbitmq /bin/bash
# 계정 추가
rabbitmqctl add_user {USERID} {PASSWORD}
rabbitmqctl set_user_tags {USERID} administrator
rabbitmqctl set_permissions -p / {USERID} ".*" ".*" ".*"
# Plugin 활성화
rabbitmq-plugins enable rabbitmq_mqtt
# management plugin 활성화
rabbitmq-plugins enable rabbitmq_management
# 설치 확인
rabbbitmq-plugins list
Publisher Source
import paho.mqtt.client as mqtt_client
import json
mqtt_client = mqtt_client.Client("publisher")
mqtt_client.connect("127.0.0.1", 1883)
mqtt_client.loop_start()
message = json.dumps({"key": "value"})
mqtt_client.publish("a/b", message, 1)
mqtt_client.loop_stop()
Subscribe Source
import paho.mqtt.client as mqtt_client
def on_message(client, userdata, message):
print(f"Message Recv : {message.payload}")
print(f"Message Topic : {message.topic}")
print(f"Message qos: {message.qos}")
print(f"Message retain flag: {message.retain}")
def on_connect(client, userdata, flags, rc):
print(f"rc : {rc}")
print("=====")
client = mqtt_client.Client("client1")
client.connect("127.0.0.1", 1883)
client.subscribe("a/b")
client.on_message = on_message
client.on_connect = on_connect
client.loop_forever()
Result