메뉴

문서정보

목차

이 문서는 AWS CLI 환경을 갖췄다는 가정하에 작성했다.

하둡(Hadoop) MapReduce

맵리듀스(MapReduce)는 분산&병렬처리 알고리즘을 이용 클러스터링 환경에서 빅 데이터 세트를 처리하기 위한 프로그래밍 모델및 관련 구현체를 일컫는다.

맵리듀스 프로그래밍 관련 구현체중 가장 유명한 구현체가 하둡 맵리듀스다.

Hadoop streaming

하둡 스트리밍은 Hadoop에서 배포하는 유틸리티다. 이 유틸리티를 이용하면, mapper과 reducer를 다양한 스크립트로 실행 할 수 있다. 예를 들어 아래와 같은 응용이 가능하다.
$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper /bin/cat \
    -reducer /bin/wc

AWS EMR

Amazon Elastic MapReduce(EMR)은 AWS에서 제공하는 관리형 하둡 프레임워크다. EC2 인스턴스를 기반으로 클러스터를 구성하며, 대량의 데이터를 비용효율적으로 처리 할 수 있다. 하둡 기반의 빅데이터 처리 시스템을 구성 할 경우 다수의 서버를 상시 준비상태로 둬야 하는데, EMR은 필요할 때 즉시, 필요한 크기 만큼만 실행 할 수 있다.

또한 Amazon S3와 DynamoDB, Aurora RDS 와 같은 AWS 데이터 스토어와의 상호작용이 쉽기 때문에 Amazon 기반에서의 원할한 데이터 작업환경을 만들 수 있다. 예를 들어 아래와 같은 구성이 가능하다.
  1. 대량의 분석 파일을 S3로 저장한다.
  2. EMR을 이용 S3로 부터 파일을 읽어서 분석 데이터를 만든다.
  3. 분석 데이터를 DynamoDB와 Aurora RDS에 저장해서 서비스에 사용 할 수 있도록 한다.
개발자는 Jupyter Notebook에 기반한 EMR NoteBooks을 분석 툴로 사용 할 수 있다.

EMR은 로그분석, 대량의 데이터 색인, ETL, 기계학습, 금융분석, IoT 등 광범위한 빅데이터를 처리하기 위해서 사용 한다.

예제

EMR을 이용해서 영어단어 갯수를 계산하는 간단한 프로그램을 만들어보자. 테스트에 사용할 데이터 파일을 다운로드 했다.
# wget http://storage.googleapis.com/books/ngrams/books/googlebooks-eng-all-1gram-20120701-x.gz
# gunzip googlebooks-eng-all-1gram-20120701-x.gz 
# head googlebooks-eng-all-1gram-20120701-x 
X'rays	1914	1	1
X'rays	1917	1	1
X'rays	1919	1	1
X'rays	1921	1	1
X'rays	1922	2	1
X'rays	1923	1	1
X'rays	1927	1	1
X'rays	1930	5	3
X'rays	1931	2	2
X'rays	1932	3	2
이 데이터에서 우리는 1900년대에 xray 단어의 출현빈도를 확인 할 수 있다. 첫번째 열은 단어이고, 두 번째는 출현 연도, 세번째 열은 총 출현횟수, 마지막 열은 출현한 책의 갯수다.

우리는 위 데이터에서 알파벳 문자로만 이루어진 (정상적인)단어를 찾아서 1999년에 어떤 단어가 나타나기 시작했는지를 살펴볼 것이다.

맵리듀스의 첫번째 단계는 mapper를 만드는 것이다. 여기에서는 특수문자등을 제거해서 깨끗한 단어를 포함한 줄(레코드)만 출력하는 프로그램을 만든다. mapper의 코드는 아래와 같다.
#!/usr/bin/env python3
 
import sys
 
def CleanWord(aword):
    """
    Function input: A string which is meant to be
       interpreted as a single word.
    Output: a clean, lower-case version of the word
    """
    # 소문자로 변환한다. 
    aword = aword.lower()
    # 문자열에서 특수문자를 제거한다. 
    for character in '.,;:\'?':
        aword = aword.replace(character,'')
    # 문자열의 길이가 0이라면 None을 반환한다. 
    if len(aword)==0:
        return None
    # 표준영어 알파벳으로 제한한다. 
    for character in aword:
        if character not in 'abcdefghijklmnopqrstuvwxyz':
            return None
    # 정리된 단어를 반환한다. 
    return aword
 
# 루프를 돌면서 표준입력 데이터를 읽는다. 
for line in sys.stdin:
    # 읽은 줄의 처음과 시작에 있는 화이트스페이스 문자를 제거하고
    # 화이트스페이스 기준으로 분해해서 배열로 만든다.
    line = line.strip().split()
    # CleanWord 함수를 이용해서 소문자 영어알파벳을 만든다. 
    word = CleanWord(line[0])
 
    # CleanWord 결과가 None인 경우 다음 줄로 넘어간다. 
    if word == None:
        continue
 
    year = int(line[1])
    occurrences = int(line[2])
 
    # Print the output: word, year, and number of occurrences
    print ('%s\t%s\t%s' % (word, year,occurrences))
실행해보자.
# cat googlebooks-eng-all-1gram-20120701-x | ./mapper.py
xrays   1914    1
xrays   1917    1
xrays   1919    1
xrays   1921    1
xrays   1922    2
xrays   1923    1
xrays   1927    1
xrays   1930    5

아래는 리듀서다.
#!/usr/bin/env python3
import sys
 
# current_word will be the word in each loop iteration
current_word = ''
# word_in_progress will be the word we have been working
# on for the last few iterations
word_in_progress = ''
 
# target_year_count is the number of word occurrences
# in the target year
target_year_count = 0
# prior_year_count is the number of word occurrenes
# in the years prior to the target year
prior_year_count = 0
 
# Define the target year, in our case 1999
target_year = 1999 
 
# 루프를 돌면서 표준입력을 읽는다. 
for line in sys.stdin:
 
    # 읽은 문자열을 탭문자로 나눈다. 
    line = line.strip().split('\t')

    # 배열의 길이가 3이 아니라면 포맷에 맞지 않는 데이터다.
    # 다음 줄을 읽는다.
    if len(line)!=3:
        continue
 
    # 이 배열에는 단어, 단어가 발생한 년도, 발생 수
    # 로 구성된다.
    current_word, year, occurrences =  line
 
    # 만약 새로 등장한 단어라면, 카운터를 0부터 새로 시작한다. 
    # 문자열들은 정렬돼 있기 때문에, 이전 문자열과 다른 것으로
    # 새로 등장한 단어인지를 검토 할 수 있다.
    if current_word != word_in_progress:
        # Word exists in target year
        # 단어의 카운트가 0 보다 크면
        if target_year_count > 0:
            # target_year 이전에 단어가 등장하지 않았다면
            # target_year에 새로 등장한 단어일 것이다.
            if prior_year_count ==0:
                # Print the cool new word and its occurrences
                print ('%s\t%s' % (word_in_progress,target_year_count))
 
        # 카운터들을 0으로 초기화 한다. 
        target_year_count = 0
        prior_year_count = 0
        word_in_progress = current_word
 
    # year와 occureences가 integer이 아닐 경우
    # 다음 루프를 진행한다.
    try:
        year = int(year)
    except ValueError:
        continue
    try:
         occurrences = int(occurrences)
    except ValueError:
        continue
 
    # 변수를 추가한다. 
    if year == target_year:
        target_year_count += occurrences
    if year < target_year:
        prior_year_count += occurrences
 
# 루프가 끝난 후에 마지막 단어가 조건에 만족하는지 검사한다.
if target_year_count > 0:
    if prior_year_count ==0:
        print ('%s\t%s' % (word_in_progress,target_year_count))
리듀서 프로그램은 target_year인 1999에 처음 출현한 단어와 출현빈도를 출력한다. 앞서 만든 맵퍼와 리듀서는 표준입출력으로 작동하기 때문에 shell에서도 테스트해볼 수 있다. 단 맵퍼의 결과는 정렬되어야 한다.
$ cat googlebooks-eng-all-1gram-20120701-x | ./mapper.py | sort -k1,1 | ./reduer.py | sort -k2,2n
...
...
xaconnectionfactory	21
xdcam	25
xmlparser	83
xadatasource	338
지금은 데이터가 4백만 줄 정도라서 개인 데스크탑에서 쉘 스크립트돌리는 것 만으로도 십초 안에 작업을 끝낼 수 있었다. 하지만 수십억 데이터라고 하면, 이야기가 달라질 것이다.

위 스크립트 예제는 크게 3개 과정으로 진행되는 걸 확인 했을 것이다.
  1. 맵퍼에서 데이터를 필터링하고
  2. 특정 필드를 키로 정렬하고
  3. 정렬한 데이터를 리듀서에게 넘겨준다.
맵리듀스도 이 3개의 과정으로 진행된다. 단지 분산된다는 점이 다를 뿐이다. 아래 그림을 보자.

 Map Reduce Flow

EMR로 테스트해보자

하둡은 대용량의 파일을 처리하기 위해서 HDFS라는 분산파일 시스템을 파일 시스템으로 사용한다. EMR의 경우 HDFS 대신에 S3를 사용한다. HDFS에 비해서 훨씬 더 효과적으로 사용 할 수 있다. HDFS 대신에 S3를 사용해야 하는 5가지 이유 문서를 읽어보자. 결론만 요약해보자면 아래와 같다.

S3 HDFS S3 vs HDFS
탄력적 운영 Yes No S3가 더욱 탄력적이다.
테라바이트당 월 비용 23$ 206$ 10X
가용성 99.99% 99.9% 10X
내구성 99.999999999% 99.9999% 10X
트랜잭션 쓰기 DBIO를 사용 YES HDFS가 좀 더 낫다.
aws cli를 이용해서 s3에 테스트용 버킷을 만들었다.
$ aws s3 mb s3://emr.joinc.co.kr
make_bucket: emr.joinc.co.kr
이 버킷에 분석할 파일과 매퍼,리듀서를 복사한다.
# aws s3 cp reduer.py s3://emr.joinc.co.kr/code/reducer.py
# aws s3 cp mapper.py s3://emr.joinc.co.kr/code/mapper.py
# aws s3 cp googlebooks-eng-all-1gram-20120701-x s3://emr.joinc.co.kr/input/NGramsX.txt

EMR 수행

AWS CLI로 EMR을 실행해보려 한다. 일반적인 사용 방법은 아래와 같다.
aws emr create-cluster --name "Test cluster" --ami-version 2.4 --applications Name=Hive Name=Pig \
--use-default-roles --ec2-attributes KeyName=myKey \
--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m4.large InstanceGroupType=CORE,InstanceCount=2,InstanceType=m4.large \
--steps Type=PIG,Name="Pig Program",ActionOnFailure=CONTINUE,Args=[-f,s3://mybucket/scripts/pigscript.pig,-p,INPUT=s3://mybucket/inputdata/,-p,OUTPUT=s3://mybucket/outputdata/,$INPUT=s3://mybucket/inputdata/,$OUTPUT=s3://mybucket/outputdata/]
하지만 옵션이 엄청나게 많고 번잡스럽기 때문에, 그냥 웹 콘솔로 만들기로 했다. 자동화를 위해서는 결국 aws cli를 사용해야 겠지만, 웹 콘솔에서 클러스터를 만들면 aws cli import로 명령을 확인 할 수 있기 때문에 문제될건 없다. EMR 대시보드로 이동하자.

 Create EMR Cluster

 Create EMR Cluster

Create Cluster를 클릭한다.

 클러스터 설정

 클러스터 설정

 하드웨어 설정

 하드웨어 설정

Create cluster를 클릭하면 클러스터가 만들어진다.

 설정화면

 설정화면

클러스터를 만들고 나면, 상세 정보를 보여준다. AWS CLI export를 클릭하면, 이 클러스터를 만드는데 사용한 aws cli 명령을 확인 할 수 있다.
aws emr create-cluster --applications Name=Ganglia Name=Hadoop Name=Hive Name=Hue Name=Mahout Name=Pig Name=Tez --ec2-attributes '{"KeyName":"joinc_test","InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-9e1502f6","EmrManagedSlaveSecurityGroup":"sg-0a9e006395552860f","EmrManagedMasterSecurityGroup":"sg-0a8012b8f0c91eb6f"}' --service-role EMR_DefaultRole --enable-debugging --release-label emr-5.24.0 --log-uri 's3n://aws-logs-522373083963-ap-northeast-2/elasticmapreduce/' --name 'My cluster' --instance-groups '[{"InstanceCount":2,"InstanceGroupType":"CORE","InstanceType":"r3.xlarge","Name":"Core Instance Group"},{"InstanceCount":1,"InstanceGroupType":"MASTER","InstanceType":"r3.xlarge","Name":"Master Instance Group"}]' --scale-down-behavior TERMINATE_AT_TASK_COMPLETION --region ap-northeast-2
상당히 복잡하다.

클러스터는 만들어지는데 시간이 걸린다. 처음에는 Starting상태로 Waiting상태가 되야 비로서 클러스터를 사용 할 수 있다. 클러스터 규모에 따라서 달라질 수 있겠는데, 내 경우 클러스터가 준비되는데 8분 정도의 시간이 걸렸다.
$ aws emr list-clusters --active
{
    "Clusters": [
        {
            "Id": "j-3KGZHWMIER9RH",
            "Name": "My cluster",
            "Status": {
                "State": "STARTING",
                "StateChangeReason": {
                    "Message": "Configuring cluster software"
                },
                "Timeline": {
                    "CreationDateTime": 1561041365.074
                }
            },
            "NormalizedInstanceHours": 0
        }
    ]
}
$ aws emr list-clusters --active
{
    "Clusters": [
        {
            "Id": "j-3KGZHWMIER9RH",
            "Name": "My cluster",
            "Status": {
                "State": "WAITING",
            // .... 생략 
        }
    ]
}
WAITING 상태다. 준비완료. 이제 작업을 실행하면 된다.

 Add Step

 Add Step

클러스터의 Steps 탭에서 Add step를 클릭한다.

 Add Step 상세

 Add Step 상세
작업이 완료됐는지 확인해보자.
$ aws emr describe-step --cluster-id j-3KGZHWMIER9RH --step-id s-7B1F1Y7FEESB
{
    "Step": {
        "Id": "s-7B1F1Y7FEESB",
        "Name": "Streaming program",
        "Config": {
            "Jar": "command-runner.jar",
            "Properties": {},
            "Args": [
                "hadoop-streaming",
                "-files",
                "s3://emr.joinc.co.kr/code/mapper.py,s3://emr.joinc.co.kr/code/reducer.py",
                "-mapper",
                "mapper.py",
                "-reducer",
                "reducer.py",
                "-input",
                "s3://emr.joinc.co.kr/input/NGramsX.txt",
                "-output",
                "s3://emr.joinc.co.kr/output/"
            ]
        },
        "ActionOnFailure": "CONTINUE",
        "Status": {
            "State": "COMPLETED",
            "StateChangeReason": {},
            "Timeline": {
                "CreationDateTime": 1561042395.67,
                "StartDateTime": 1561042406.848,
                "EndDateTime": 1561042494.977
            }
        }
    }
}
성공했다. 결과파일이 잘 만들어졌는지 확인해 보자.
$ aws s3 ls s3://emr.joinc.co.kr/output/
2019-06-20 23:54:51          0 _SUCCESS
2019-06-20 23:54:42         97 part-00000
2019-06-20 23:54:42         55 part-00001
2019-06-20 23:54:43         45 part-00002
2019-06-20 23:54:43         90 part-00003
2019-06-20 23:54:48        123 part-00004
2019-06-20 23:54:51         97 part-00005
2019-06-20 23:54:51         76 part-00006
파일을 다운로드 해서, 원하는 결과가 나왔는지 검증해 보기로 했다.
$ aws s3 sync s3://emr.joinc.co.kr/output/ ./
$ cat * > tmp.txt 
$ cat tmp.txt| sort -k2,2n > result.txt
마지막으로 앞서 쉘 스크립트와 파이선 스크립트를 이용한 MR 결과 파일을(pymr.txt) 비교한다.
$ diff result.txt pymr.txt
100% 일치하는 걸 확인했다.

Hadoop streaming 란

Hadoop Streaming는 Hadoop 배포판과 함께 제공되는 유틸리티로, 빅데이터 분석을 위한 프로그램의 실행에 사용 할 수 있다. Hadoop 스트리밍은 Python, Java, PHP, Scala, Perl 등 많은 언어로 개발이 가능하다. 다양한 언어를 사용 할 수 있는 이유는 맵퍼와 리듀서가 표준입력과 표준출력을 이용해서 데이터를 입/출력 하기 때문이다.

Amazon EMR에서의 S3N, S3A, S3 사용

... 정리 중

정리

참고