메뉴

문서정보

목차

소개

MQTT 소개에서 MQTT 설치와 함께, 간단한 채팅 애플리케이션을 만들었다. 한발 더 나아가 MQTT기반의 푸쉬(push) 서비스를 기획해 보려한다. 개발이 아니고 기획이라고 하는 이유는, 푸쉬 서비스에 대한 기본 적인 얼개와 대략적인 프로토타이핑 정도까지를 개발 범위로 하고 있기 때문이다. 완전한 푸쉬 소프트웨어를 만들진 않을 거다.

푸쉬 서비스를 기획하는 이유는 다음과 같다.
  1. MQTT가 IoT에서의 메시징 프로토콜로 각광을 받고 있다. 정말 그런지 가능성을 확인해 본다.
  2. IoT 환경에서 디바이스와 유저, 유저와 유저가 효과적으로 메시지를 주고 받을 수 있는 솔류션을 찾아보고 싶다.

Push 서비스 개발

개발 환경

Push 서비스 환경 구성

내가 생각하고 있는 메시징 환경이다.

응답 채널을 MQTT로 할 경우 몇 가지 응용을 할 수 있다. 메시지 전송은 HTTP, 수신은 MQTT로 하는 구성이다.

시스템 구성

IoT Push 서비스의 구성이다.

Client Ruby MQTT 클라이언트로 테스트 한다.

API Server 유저와 토픽을 관리하며, 메시지를 전송하는 일을 한다. 인증과 권한 정보는 MQTT Proxy와 공유할 수 있어야 한다.

MQTT Proxy Client는 MQTT Proxy와 연결을 한다. MQTT Proxy는 유저의 요청을 한 번 필터링해서 MQTT 브로커토 전달한다. 프락시는 인증된 유저인지, 권한을 가진 Topic에 SUB 요청을 하는지 등을 확인해야 한다.

MQTT broker Pub/Sub 메시지를 관리하는 고유의 일을 한다. 인증, 권한등의 관리는 MQTT 프락시에 일임한다.

Push 프로세스

유저인증에서 MQTT Proxy에 연결해서, Push 서비스를 받기 까지의 프로세스다.

  1. Client는 API Server에 인증을 요청한다.
  2. 인증성공하면 session을 발행한다.
  3. Client가 Push 서비스를 받기 원한다면, MQTT Proxy에 연결 요청한다.
  4. 연결 후 첫 번째 메시지로 인증확인을 위한 Auth request 메시지를 Publish 한다.
    • 인증을 위한 전용의 Auth Topic을 만든다. /service/auth/<user_session> 정도면 되겠다.
  5. Auth request 메시지를 받은 프락시 서버는 API Server에 세션이 올바른지 확인 요청을 한다. Session validation check 과정이 되겠다.
    • 실패하면 연결을 끊는다. 실패 메시지는 Auth Topic로 전송하면 되겠다.
  6. 연결 성공하면 메시지 topic에 subscribe를 요청한다.
  7. 프락시 서버는 subscribe 요청이 올바른지 확인 후 요청을 MQTT 브로커로 넘긴다.

구현

컨셉만 구현 할 거다. 애플리케이션 프로토콜도 만들어야 하고, 데이터 베이스 시스템도 준비해야 겠으나 건너뛴다. 컨셉 검증용이라고 보면 되겠다.

MQTT Proxy 구현

Push 시스템의 핵심이라고 할 수 있는 MQTT proxy 서버에 대해서 알아보겠다. MQTT 프락시는 클라이언트와 MQTT 브로커 사이의 메시지를 연산하는 일종의 미들웨어 시스템이다.

  1. Proxy의 bind port는 5000이다.
  2. 브로커의 서비스 포트는 1883(MQTT 기본 포트)이다.
Ruby mqtt 모듈에 있는 Proxy 클래스를 이용해서 구현했다. 설명은 주석으로 대신한다.
require 'rubygems'
require 'mqtt'
require 'pp'

# MQTT::Packet에 get 메서드 추가
# topic, payload(유저 메시지)
class MQTT::Packet
    def topic 
        return @topic
    end
    def payload
        return @payload
    end
end

# MQTT::Proxy의 process_packets 메서드 재 정의
# 원래의 process_packets 는 packet만 넘겨주는데, 이 걸로는  
# 이 packet에 어떤 클라이언트의 것인지 확인할 수 없다.
# 해서 클라이언트를 구분할 수 있는 값(소켓 객체의 id)을 추가로 넘기게 했다.
class MQTT::Proxy
  def process_packets(client_socket,server_socket)
    loop do
      selected = IO.select([client_socket,server_socket], nil, nil, @select_timeout)
      if selected.nil?
        # Timeout
        raise "Timeout in select"
      else
        if selected[0].include?(client_socket)
          packet = MQTT::Packet.read(client_socket)
          packet = @client_filter.call(packet, client_socket.object_id) unless @client_filter.nil?
          # packet이 nil 이라면 인증 오류인 걸로 한다.          
          if packet.nil?
            raise "Authorization Error"
          else
            server_socket.write(packet)
          end
        elsif selected[0].include?(server_socket)
          packet = MQTT::Packet.read(server_socket)
          packet = @server_filter.call(packet) unless @server_filter.nil?
          unless packet.nil?
            client_socket.write(packet)
          end
        else
            logger.error "Problem with select: socket is neither server or client"
        end
      end
    end
  end
end

class MQTTProxy
    attr_reader :proxy
    attr_reader :session_info
    attr_reader :logger
    def initialize(args)
        @proxy=MQTT::Proxy.new args
        @session_info = Hash.new

        @logger = args[:logger]
        if @logger.nil?
            @logger = Logger.new(STDOUT)
            @logger.level = Logger::INFO
        end
    end

    # 유저 세션을 검사한다. 
    # 귀찮아서 123456이면 성공인 걸로 했다.
    def validationSession? session 
        # session을 가져오기 위한 연산이 들어간다.
        puts "Check #{session == "123456"}"
        return session == "123456"
    end

    def run
        # 클라이언트 요청을 읽어서 분석한다. 
        # MQTT::Proxy에서 읽은 클라이언트의 MQTT 패킷과 id를 매개변수로 연산을 한다.
        # id는 현재 유저 세션을 가리키는 key로 socket의 object_id를 이용했다.
        @proxy.client_filter = lambda { |packet, id|
            # connect 한 후 첫번째 메시지에 대해서 세션을 검사한다.
            if @session_info.has_key?(id)
                if @session_info[id] == 0
                    return nil if !validationSession? packet.payload
                    logger.info "MQTT::Proxy User authentication success"
                end
                @session_info[id] += 1 
                @session_info['topic'] = "/user/#{id}"
            else
                @session_info[id] = 0
            end
            return packet
        }

        # server filter
        @proxy.server_filter = lambda { |packet|
            #puts "From server: #{packet.inspect}"
            return packet
        }
        @proxy.run
    end
end

proxy = MQTTProxy.new(
    :local_host => '0.0.0.0',
    :local_port => 5000,
    :server_host => 'test.mosquitto.org',
    :server_port => 1883
)
proxy.run

MQTT Client 구현

클라이언트 프로그램이다.
# name : client.rb
require 'rubygems'
require 'mqtt'
require 'readline'

class MqttClient
    attr_reader :server_host
    attr_reader :server_port
    attr_reader :server
    attr_reader :logger
    attr_reader :client_session
    attr_reader :nickname

    def initialize(args={})
        @server_host = args[:server_host] 
        @server_port = args[:server_port] 
        @nickname = args[:nickname] 
        @server = nil

        @logger = args[:logger]
        if @logger.nil?
            @logger = Logger.new(STDOUT)
            @logger.level = Logger::INFO
        end
    end

    # Login 메서드 
    # 123456을 하드 코딩했다.
    def login session 
        @server.publish "service/auth", session
    end

    def pub topic, message
        @server.publish topic, message 
    end
    def sub topic
        puts @server.get topic
    end

    def run
        begin
            @server = MQTT::Client.connect(
                :host => @server_host, 
                :port => @server_port
            )
        rescue Exception=>  exp
            puts "Error"
        end
        # User Auth 
        login "123456"

        # Publish Thread
        Thread.new do
            while message = Readline.readline("", true)
                pub "/private/123456", "#{@nickname} -> #{message}"
            end
        end

        # Subscribe
        loop do
            sub "/private/123456"
        end
    end
end

nickname = ARGV[0] 

client = MqttClient.new( 
    server_host: "localhost",
    server_port: 5000,
    client_session: "123456",
    nickname: nickname)

client.run

평가

지금까지의 프로토타이핑 결과를 토대로 MQTT를 이용한 Push 서비스 개발 가능성을 평가했다.

현실적인 설계

지금까지의 코드는 기능 검증 수준이었다. 실제 프로젝트에서 사용하고자 할때, 고려해야할 설계 요소들을 고민해 보려한다.

좀더 현실적인 설계

AWS 인프라를 기준으로 그림을 그렸다.

ELB를 이용해서 가용성과 확장성을 보장받을 수 있다. MQTT Proxy와 MQTT Broker는 서로 다른 인스턴스로 분리할 수도 있겠는데, 여기에서는 그냥 하나의 인스턴스에 두는 걸로 구성했다.

IoT 인프라는 여기에서 다루기에는 범위가 너무 넓어서, 그냥 블랙박스로 표현했다. 이미 구축된 IoT Infra에 MQTT 인터페이스를 붙인 걸로 생각하면 되겠다. IoT 인프라는 언젠가 기회가 되면 따로 고민해볼 생각이다.

이 구성은 여러 디바이스가 하나의 유저에게 publish 할 경우 깔끔하게 작동할 거다. 하지만 두 명 이상의 유저에게 publish 할 경우 고민이 좀 필요하다. 먼저 하나의 유저에게 publish 하는 경우다.

유저의 private topic는 MQTT Instance group의 여러 인스턴스 중 하나만 가지고 있고, 디바이스에서 발생한 메시지는 간단한 연산으로 해당 MQTT instance에 전송할 수 있다.

그룹 메시지 Push

그렇다 그룹 기능이다. 유저 그룹과 디바이스가 통신을 할 수 있고, 유저 그룹내에서 유저끼리의 통신도 가능하다. 디바이스 자리에 유저를 두면 된다. 메시지를 topic을 가진 인스턴스 갯수 만큼 복사하는 방법도 있는데, 간단하기는 하지만 비효율적이다. 효율적인 방법을 좀 찾아보자.

음.. 그룹 topic를 따로 만드는 방법을 생각할 수 있다.

이 방안은 구성이 단순하다는 장점이 있는데, 가입한 그룹이 많아지면 유지해야 하는 소켓이 함께 늘어나는 단점이 있다. 단점을 어떻게 해결할까 ?

클라이언트가 가지는 부담이다. 서버가 부담을 가지고 가는 것 보다는 클라이언트에게 부담을 주는게 낫겠지 ? 라고 맘편하게 생각하고 무시하는 것도 방법이다. 여기에 더해서 유저가 가입할 수 있는 그룹 topic의 총 갯수를 제한하는 방법도 함께 사용할 수있겠다. SNS 서비스라면 문제가 되겠지만 타협도 가능할 것이다. 하지만 기분이 좋지 않은 건 어쩔 수 없다. 소켓이 늘어나면, 모바일 기기의 전원 소비도 늘어날 거다. Ping 패킷 주기가 10초에 소켓이 50개라면 0.2초마다 패킷을 전송해야 한다. 무시할 수 없는 데이터다.

이 방식의 단점을 제거하거나 회피하기 위한 방법을 생각해 보자.
  1. 소켓 줄이기 : 그룹이 소수의 인스턴스에 몰아 넣을 수 있다면 좋겠는데.. 글쎄.. 이건 방법이 떠오르지 않는다.
  2. Ping 패킷 주기 줄이기 : 클라이언트 집단이 Ping 패킷을 보내고 있으므로 현재 전체 서버 상황을 알수 있으며, 그중 내가 속한 그룹을 서비스하는 서버들의 상태를 알 수 있다. 한번의 Ping 패킷으로 여러 서버의 상태 정보를 확인할 수 있으므로 ping 요청을 줄일 수 있다.
클라이언트가 할 일을 서버에서 대신 수행하는 방법도 있다.

유저는 private topic에만 붙어있으면, 알아서 그룹 메시지를 척척 배달해 준다. 클라이언트 입장에서는 깔끔하지만, 서버는 더러워진다.

Private topic 인스턴스가 MQTT Client가 되고 Group topic 인스턴스가 MQTT broker 이 되면, 유저 앞단에 ELB 구성이 가능하다. 각각의 서버 군을 어떻게 효과적으로 관리할 것인지가 주요 이슈가 되겠다. 주키퍼를 이용하던지 혹은 어떤 걸 이용하던지 간에, 관리 솔류션 도입이 필요하다. 주키퍼는 따로 다루어야 하겠다.

/group 토픽을 관리하기 위한 별도의 인스턴스를 뒀는데, 이들 인스턴스가 하는 일을 publisher에 맡기는 방법도 있다. Pubisher가 그룹에 있는 모든 유저를 검사해서 pub하는 식이다.

일단 몇 개의 인스턴스들만으로 그림을 그려봤다. 딱히 구성에 문제는 없는 것 같은데, 이 정도 그림으로는 인스턴스가 수백에서 수천으로 넘어갔을 때 제대로 작동할 지를 예상하기 힘들다. 하나 이상의 subnet으로 구성될 경우, 지역을 넘나들 경우를 상정해서 그림을 그려봐야 할 것 같다.

고가용성

서비스, topic, 메시지 3개의 영역에 대한 고가용성을 확보해야 한다.

서비스 고가용성

ELB로 가용성을 확보한다. ELB의 Keep alive time은 최대 3600초인데, 그보다 짧은 주기로 MQTT Ping를 교환하기 때문에 연결성에 문제가 생길 만한 부분은 없다.

Topic 과 메시지 고가용성

현재 구성은 하나의 채널이 하나의 인스턴스에 있는 구조다. 따라서 채널을 물고 있는 인스턴스가 맛이가면, 채널도 함께 증발해 버린다. 채널이 증발하면, 클라이언트는 다음 MQTT ping 주기에 채널이 문제가 있는 것을 확인하고 재 연결을 시도한다. 재 연결을 시도하면, ELB 그룹에 있는 MQTT 인스턴스 중 하나가 Topic을 제공해 준다. Single point 취약점을 가지고 있기는 하지만, 빠르게 장애극복(failover)이 가능하다는 점에서 ELB 구성하는 정도로 Topic 고가용성은 충분히 만족 한다고 생각한다.

위의 방식은 시스템을 단순화 할 수 있다는 장점이 있지만, Broker에 있던 메시지에 대해서는 이게 클라이언트까지 전달이 됐는지를 보장할 수 없다. 어떤 데이터의 누락도 허용하지 않는 시스템을 계획 한다면 MQTT의 QoS 설정으로 해결할 수 있다. MQTT는 3단계의 QoS 수준을 제공한다.

정리

왜 MQTT 인가

메시징 프로토콜로 XMPP가 있다. 유저와 유저가 메시지를 주고 받는다면 자원이 빵빵한 모바일기기를 이용할 테니, XMPP의 (상대적인 무거움)이 크게 문제되지는 않을 거다. 하지만 그 대상이 device의 경우에는 열악한 환경(저전력, 직비와 같은 noip)을 고려해야 한다. 가트너는 스마트폰,태블릿,PC를 제외한 IoT기기가 260억대로 늘어날 것으로 예측하고 있다. IoT의 특성상 많은 기기들이 noip환경에서 IP gateway를 경유해서 인터넷에 붙을 거라고 가정하면, 연결 기기 갯수는 더욱 늘어날 거다. 전력문제를 고민할 수 밖에 없다.

IoT 메시징을 위한 주요 프로토콜들

MQTT, AMQP, HTTP 대략 3개의 프로토콜들이 중요하게 사용될 것 같다.(MQTT와 비교해서 CoAP도 언급되는 것 같은데, 이건 더 살펴봐야 겠다.)

참고