Recommanded Free YOUTUBE Lecture: <% selectedImage[1] %>

Contents

Pub/Sub

REDIS는 다양한 데이터 타입을 지원하는 메시지 큐로 알려져 있다. 그런 REDIS가 Pub/Sub를 지원한다. Pub/Sub는 메시지큐와는 특성이 다르다. Pub/Sub 시스템에서는 채널에 구독 신청을 한 모든 subscriber에게 메시지를 전달한다. 메시지를 "던지는" 시스템이기 때문에, 메시지를 보관하지도 않는다. 메시지 큐 본연의 목적과는 좀 다른 기능이라고 할 수 있겠다. 어쨋든 REDIS에서 제공하는 기능이고, 최근 효율적인 PUB/SUB 구축에 대한 요구도 있고해서 살펴보기로 했다. REDIS에 대한 기본적인 지식(설치, 간단한 운용, 지원하는 데이터 타입에 대한 이해)은 가지고 있다고 가정한다.

SUBSCRIBE 명령을 이용해서 채널을 구독할 수 있다. 매개변수로 채널 이름이 들어간다. 하나 이상의 채널에 대한 구독도 가능하다. test 채널과 qa 채널에 대해서 구독 신청을 했다.
> SUBSCRIBE test qa

PUBLISH 명령을 이용해서 채널에 메시지를 발행할 수 있다.
> PUBLISH test

Push 메시지 데이터 형식

Push 메시지는 3개의 요소들로 구성된 "배열값"을 전송한다.
  1. Push 메시지 타입 : 어떤 종류의 메시지인지 알려준다. "subscribe", "message", "unsubscribe" 3가지 타입의 메시지가 있다.
    • subscribe : 채널을 성공적으로 subscribe 했다.
    • message : 채널로 부터 전송된 일반 메시지
    • unsubscribe : 채널을 성공적으로 unsubscribe 했다.
  2. Channel name : Subscribe한 채널 이름
  3. Message : 전송된 메시지
최초 Subscribe 했을 때의 메시지다.
> SUBSCRIBE mytopic
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "mytopic"
3) (integer) 1

"Hello world" 메시지를 publish 했다.
> PUBLISH mytopic "Hello world"
(integer) 1

Subscribe측 메시지 정보다.
> SUBSCRIBE mytopic
1) "message"
2) "mytopic"
3) "Hello world"
mytopic에서 "Hello world"메시지가 도착했음을 알 수 있다.

Database & Scope

PUB/SUB은 Database와 상관없이 작동한다. 15번 db에서 PUB를 한 메시지를 1번 db에서도 받아볼 수 있다.

5번 데이터베이스에서 "Hello world"를 publish했다.
> select 5
OK
[5]> PUBLISH mytopic "Hello world"
(integer) 1

2번 데이터베이스에 있는 클라이언트도 subscribe 할 수 있다.
> select 2
OK
[2]> SUBSCRIBE mytopic
1) "message"
2) "mytopic"
3) "Hello world"

패턴매칭 subscribe

채널이름에 패턴매칭을 지원한다. "news."에서 발행하는 모든 메시지를 sub 하고 싶다면 아래와 같이 채널이름을 설정하면 된다.
> PSUBSCRIBE news.*

프로그래밍

개발 환경은 다음과 같다.
  • 언어 : Ruby 2.1.2p95
  • 운영체제 : 우분투 리눅스 14.10
redis gem을 설치하자
# gem install redis

PUB 프로그램

Channel에 Publish하는 간단한 프로그램이다.
# cat pub.rb
require 'redis'
require 'json'

class Publisher
    @name = nil
    @channel = nil
    @redis = nil

    def initialize args
        @name =args[:name]
        @channel = args[:channel]
        @redis = Redis.new(:host=>"192.168.57.2")
    end

    def run
        data = {"user"=>@name}
        loop do
            print "> "
            msg = STDIN.gets
            @redis.publish @channel, data.merge("msg"=>msg.strip).to_json
        end
    end

end

name = ARGV[0]
channel = ARGV[1]
puts "Name : #{name}"
puts "channel : #{channel}"
pub = Publisher.new({:name=>name, :channel=>channel})
pub.run
프로그램은 "name"과 "channel"이름을 매개변수로 받는다.
# ruby pub.rb yundream chatting 
Name : yundream
channel : chatting
> Hello world
>

redis-cli의 MONITOR 명령으로 확인을 했다.
# redis-cli -h 192.168.57.2
> MONITOR 
1415955481.013326 [0 192.168.57.1:37574] "publish" "chatting" "{\"user\":\"yundream\",\"msg\":\"Hello world\"}"

SUB 프로그램

require 'rubygems'
require 'redis'
require 'json'

class Subscriber
    @channel = nil
    @redis = nil
    def initialize args
        @channel = args[:channel]
        @redis = Redis.new(:host=>"192.168.57.2")
    end
    def run
        @redis.subscribe(@channel) do |on|
            on.message do |channel, msg|
                data = JSON.parse(msg)
                puts "##{channel} #{data['user']} : #{data['msg']}"
            end
        end
    end
end

channel = ARGV[0]
puts "channel : #{channel}"
sub = Subscriber.new({:channel=>channel})
sub.run

응용

좀 더 생각거리가 있는 응용 프로그램을 만들어 보고 싶다. 뉴스 구독 시스템을 만들기로 했다.

구성

  1. Sinatra는 Sub Client를 별도의 쓰레드로 운용한다. REDIS Server로 부터 SUB한 정보를 웹 소켓에 쓴다. Sinatra는 웹 소켓 목록을 유지하고 있어야 한다.
  2. Web browser와 Sinatra와는 websocket로 연결한다.
  3. PUB Client 프로그램으로 REDIS PUB Server에 메시지를 publish 한다.
  4. SUB Client는 REDIS Server로 부터 메시지를 읽고, 연결한 웹 소켓에 쓴다.
제대로 구현하려면 회색부분도 추가해야 할 거다. 유저가 연결하지 않은 상태에서 전달된 메시지는 메시지함(REDIS Msg Queue)에 적재 하는등의 컴포넌트가 필요하지만, 완전히 작동하는 웹 애플리케이션을 만드는게 목적은 아니니 구현은 하지 않기로 한다.

개발 범위

제대로 개발 하려면,
  1. 유저는 구독할 뉴스 토픽을 선택
  2. 웹 애플리케이션 서버는 유저가 구독한 토픽에 대해서만 PUB 해야 할 것이다.
하지만 모든 토픽을 클라이언트에게 전달할 것이다. 구현하기 귀찮다.

메시지 포멧

JSON 메시지 포멧을 사용한다.
{
  "topic": "토픽 이름",
  "msg": "메시지"
}

웹 애플리케이션 서버

Ruby Sinatra로 개발했다.
# encoding: utf-8             
require 'sinatra-websocket'   
require 'redis'               
require 'json'                
  
class MyApp < Sinatra::Application
    # 클라이언트(웹 브라우저)의 웹 소캣 객체를 저장한다.
    $ws_a = Hash.new          
    Thread.new do             
        redis = Redis.new(:host=>'192.168.56.5')
        redis.subscribe("message") do | on |
            # 읽은 메시지는 모든 클라이언트 웹 소캣에 그대로 쓴다.
            on.message do |channel, msg|   
                $ws_a.each do | name, ws|      
                    puts "Send Data #{msg}"        
                    ws.send(msg)                   
                end
            end               
        end
    end

    # localhost:3000/?name=yundream  형식으로 요청한다.
    # name 파라메터는 클라이언트를 식별하기 위해서 사용하고 있다. 
    # 세션을 만들어야 겠지만 귀찮아서.(모든게 귀찮음 으로 해결된다.)
    get "/" do
        @name = params[:name]
        erb :index
    end

    get "/message" do
        @name = params[:name]
        puts "User is #{@name}"        
        if !request.websocket?
            erb :error
        else
            request.websocket do |ws|      
                # 웹 소켓이 연결되면 소켓 목록에 웹 소켓을
                # 저장한다.  
                ws.onopen do
                    $ws_a[@name] = ws              
                    puts "Connection OK #{@name}"  
                end

                ws.onmessage do |msg|          
                end

                ws.onclose do
                    if $ws_a.has_key? @name        
                        puts "Remove websocket #{@name}"
                        $ws_a.delete @name             
                    end
                end
            end
        end
    end
end

index.erb

아래는 Index.erb에 사용한 코드다. 주석으로 대신한다. 이왕 만드는 거 foundation으로 예쁘게 만들어 보고 싶었으나.. 귀찮아서 UI는 포기
<!doctype html>
<!--[if IE 9]><html class="lt-ie10" lang="en" > <![endif]-->
<!--[if IE 10]><html class="ie10" lang="en" > <![endif]-->
<html class="no-js" lang="en" data-useragent="Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)">
    <head>
        <link rel="stylesheet" href="/css/foundation.css" />
        <link rel="stylesheet" href="/foundation-icons/foundation-icons.css" />
        <script src="/js/vendor/modernizr.js"></script>
        <script src="/js/vendor/jquery.js"></script>
        <script src="/js/foundation/foundation.js"></script>
        <script src="/js/foundation/foundation.reveal.js"></script>
        <script src="/js/foundation/foundation.dropdown.js"></script>
        <meta http-equiv="Content-Type" content="text/html;charset=UTF-8" />

        <script>
            // 웹 소켓으로 부터 읽은 데이터를 출력한다.
            function addItem(obj) {
                $("#container").append(
                    '<div class="large-12 columns">'+obj.topic+':'+obj.msg+'</div>')
            }
            // 웹 소켓으로 부터 데이터를 읽는다.
            function message() {
                var ws = new WebSocket('ws://' + window.location.host + '/message?name='+'yundream');
                ws.onopen    = function()  {};
                ws.onclose   = function()  {}
                ws.onmessage = function(m) {
                    var obj = JSON.parse(m.data)
                    addItem(obj);
                };
            }
            $(document).ready(function() {
                message();
            });
        </script>
    </head>
    <body>
<!-- 데이터는 여기에 출력한다. -->
<div class="row" id="container">
</div>

</body>

테스트

웹 브라우저로 "localshot:3000/?name=username"형식으로 접근하면 된다.

PUB 클라이언트 만들기 귀찮아서 그냥 redis-cli로 테스트 했다.
# redis-cli -h 192.168.56.5
> PUBLISH message "{\"topic\":\"news\",\"msg\":\"This is web socket test\"}"

정리

MQTT와의 비교

MQTT도 비슷한 용도로 사용할 수 있는데(사실 MQTT의 전문분야라고 봐야 겠지만) 기능 보다는 사용환경에 있어서 차이가 있다. MQTT는 저전력, 신뢰할 수 없는 네트워크 환경에서의 작동을 목표로 하고 있다. 또한 QoS를 이용해서, 환경에 따라서 적절하게 서비스 품질을 조절할 수 있는 장점이 있다.

나는 IoT환경에서의 메시지 교환을 목적으로 REDIS를 테스트 했다. IoT 환경을 기준으로 내 생각을 정리 한다.
  • 모바일 기기 혹은 IoT기기와의 통신에는 MQTT를 사용한다.
  • 인프라 내부 컴포넌트들간의 메시지 교환에는 REDIS를 이용한다. 메시지 인프라에는 다양한 성격의 컴포넌트들이 존재한다. 이런 환경에서는 다양한 데이터 타입을 제공하는 REDIS가 강점을 가질 수 있다. 메시지 함, PUB/SUB 시스템, 연결 테이블 관리, 라우팅 테이블 관리등을 단일 소프트웨어로 구현할 수 있는 것은 큰 장점이다.

하고픈 것들

  • 확장 가능한 REDIS PUB/SUB 시스템 설계. REDIS 클러스터 구현 쪽을 살펴봐야 겠다.
  • 메시지 함의 구현
  • 프로토 타이핑 프로그램의 개선. 지금은 딱 작동하는 수준으로 만들었다. UI도 그렇고.. GitHub에 올린다음 개선 작업을 해야 겠다.

참고

  • http://redis.io/topics/pubsub