Recommanded Free YOUTUBE Lecture: <% selectedImage[1] %>

Contents

소개

Apache Spark 개발 환경을 세팅한다. 나는 우분투 리눅스를 사용하고 있다. 모든 내용은 우분투 리눅스를 기준으로 한다.

개발 환경 세팅

Python 설치

우분투 리눅스 19.04를 사용하고 있다.
# cat /etc/issue
Ubuntu 19.04 \n \l

# uname -a
Linux yundream 5.0.0-40-generic #44-Ubuntu SMP Wed Jan 15 02:03:45 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux

Spark 프로그래밍 언어로 python을 사용한다. 내 리눅스 운영체제에는 Python 2.7.16과 Python 3.7.3 버전이 설치돼 있다. 이 강의는 python 3를 기준으로 한다. virtualenv를 이용해서 spark 개발 환경을 세팅했다.
# virtualenv --python=/usr/bin/python3 virtualenvironment/spark
# source virtualenvironment/spark/bin/activate
(spark) # python
Python 3.7.3 (default, Oct  7 2019, 12:56:13) 
[GCC 8.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> 

JAVA 환경 만들기

(2020년 3월)현재 내 운영체제에 설치된 jdk 버전은 openjdk-11 인데 제대로 작동하지 않는 걸로 확인했다. openjdk-8 환경을 설정했다.

# apt-get install openjdk-8-jdk  
# update-alternatives --config java
대체 항목 java에 대해 (/usr/bin/java 제공) 2개 선택이 있습니다.

  선택       경로                                          우선순위 상태
------------------------------------------------------------
* 0            /usr/lib/jvm/java-11-openjdk-amd64/bin/java      1101      자동 모드
  1            /usr/lib/jvm/java-11-openjdk-amd64/bin/java      1101      수동 모드
  2            /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java   1081      수동 모드

Press <enter> to keep the current choice[*], or type selection number: 2 
update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java to provide /usr/bin/java (java) in manual mode

java 버전을 확인해보자.
# java -version
openjdk version "1.8.0_232"
OpenJDK Runtime Environment (build 1.8.0_232-8u232-b09-0ubuntu1~19.04.1-b09)
OpenJDK 64-Bit Server VM (build 25.232-b09, mixed mode)
성공

Apache Spark 설치

https://spark.apache.org/downloads.html에서 다운로드 할 수 있다.

 Spark download page

Spark 3.0.0-preview2 버전이 있었으나, 안정버전으로 하는게 좋을 것 같아서 2.4.5 버전을 설치했다.
# mv spark-2.4.5-bin-hadoop2.7.tgz /usr/local
# cd /usr/local
# tar -xvzf spark-2.4.5-bin-hadoop2.7.tgz 
# ln -s spark-2.4.5-bin-hadoop2.7 spark
PATH를 설정한다. {{[#!plain # export SPARK_HOME=/usr/local/spark # export PATH=$PATH:$SPARK_HOME/sbin:$SPARK_HOME/bin }}}

Standalone master 서버를 실행해 보자. 이 서버는 8080 포트로 접근 할 수 있다.
# $SPARK_HOME/sbin/start-master.sh 
starting org.apache.spark.deploy.master.Master, logging to /usr/local/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-yundream.out

웹브라우저로 접근해보자.

 Spark standalone Server

pyspark 설치

pip로 설치했다.
pip install pyspark
Collecting pyspark
  Downloading pyspark-2.4.5.tar.gz (217.8 MB)
     |████████████████████████████████| 217.8 MB 604 kB/s 
Collecting py4j==0.10.7
  Downloading py4j-0.10.7-py2.py3-none-any.whl (197 kB)

pyspark를 실행해보자.
# pyspark
Python 3.7.3 (default, Oct  7 2019, 12:56:13) 
[GCC 8.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
20/03/11 00:14:04 WARN Utils: Your hostname, yundream resolves to a loopback address: 127.0.1.1; using 192.168.35.163 instead (on interface wlx002666491c41)
20/03/11 00:14:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/yundream/virtualenvironment/spark/lib/python3.7/site-packages/pyspark/jars/spark-unsafe_2.11-2.4.5.jar) to method java.nio.Bits.unaligned()
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
20/03/11 00:14:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.5
      /_/

Using Python version 3.7.3 (default, Oct  7 2019 12:56:13)
SparkSession available as 'spark'.
>>> 

애플리케이션이 잘 돌아가는지 테스트해보자.
>>> rdd = sc.textFile("README.md")
>>> rdd.count()
104

sc.textFile은 org.spach,spark.SparkContext클래스에서 제공하는 메서드로 텍스트 파일을 읽기 위해서 사용한다. 이 메서드는 로컬 파일 시스템, HDFS 등 다양한 저장소에 접근 할 수 있다.

count 메서드는 RDD로 부터 엘리먼트의 갯수를 반환한다. textFile은 파일의 내용을 읽어서 줄단위로 배열로 저장한다. 결과적으로 README.md 파일의 라인수를 리턴한다. wc로 확인을 해봤다.
# wc README.md
 104  485 3756 README.md

pyspark course 코드 다운로드

이 교육 과정의 코드들은 github에 공개돼있다.
# git clone https://github.com/petergdoyle/SparkCourse.git

테스트 데이터 셋 준비하기

코드를 만들기 전에 데이터를 준비하기로 했다. movielens.org 라는 영화등급 사이트가 있는데 여기에서 테스트에 사용 할 데이터를 준비해보자. 먼저 grouplens.org 사이트를 방문한다.

 Grouplens.org

GroupLens Research 는 미네소타 대학교의 컴퓨터 학부에 있는 연구실로 인간과 컴퓨터의 상호작용을 연구한다. 또한 GroupLends는 모바일 및 유비쿼터스 기술, 지리정보 시스템 등의 정보 시스템을 제공한다. GroupLens 랩은 유즈넷 기사 추천 엔진인 "GroupLens" 추천엔진을 개발했으며, 인기영화 추천 사이트인 MovieLens를 구축하여 자동 추천 시스템을 연구 한 최초의 실험실 중 하나다. GroupLens Research 홈페이지의 Featured Projects 섹션에 MovieLens를 확인 할 수 있다.

이후 사이트에 가입하면 추천 서비스를 이용 할 수 있다. 처음 로그인하면, 사용자의 취향을 확인하기 위한 좋아햐는 영화 장르를 선택하게 된다. 이 정보를 토대로 아래와 같이 영화를 추천한다.

 movielens.org

사용자는 추천된 각 영화에 별점을 줄 수 있다. 이 별점을 데이터를 이용해서 개인화된 영화 추천 서비스를 만든다.

다시 grouplens.org 홈으로 이동한다. 홈페이지 상단 메뉴에서 datasets를 클릭한다. 여기에서 테스트할 데이터셋을 다운로드 한다. 다양한 크기의 데이터셋이 있는데, 학습이 목적이므로 작은 데이터셋을 선택했다.
# mkdir -p ~/workspace/SparkCourse
# wget http://files.grouplens.org/datasets/movielens/ml-100k.zip
# mv ml-100k.zip  ~/workspace/SparkCourse
# cd ~/workspace/SparkCourse 
# unzip ml-100k.zip

여기에는 1000명의 유저가 1700 개의 영화를 평가한 100,000 개의 점수를 가지고 있다. 평점은 1부터 5까지다.

pyspark 코드 실행

이제 각 평점대 별로 영화의 갯수를 pyspark로 계산을 할 것이다. 다운로드 한 스크립트 중 ratings-counter.py를 사용 할 것이다. 이 코드를 workspace/SparkCourse 로 복사했다. 코드의 내용을 보자.

from pyspark import SparkConf, SparkContext
import collections

conf = SparkConf().setMaster("local").setAppName("RatingsHistogram")
sc = SparkContext(conf = conf)

lines = sc.textFile("./u.data")
ratings = lines.map(lambda x: x.split()[2])
result = ratings.countByValue()

sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.items():
    print ("%s %i" % (key, value))
SparkConf()를 이용해서 애플리케이션에서 필요한 SparkConf 객체를 생성한다.
  • setMaster(value) : 마스터 URL을 설정한다. 로컬파일 시스템을 사용하기 때문에 "local"를 설정했다.
  • setAppName(value) : 애플리케이션 이름을 설정한다.
SparkContext는 Spark 기능의 시작점이다. SparkContext는 Py4J를 사용해서 JVM을 시작하고 JavaSparkContext를 생성한다. PySpark에는 기본적으로 "sc"로 사용 가능한 SparkContext가 있으므로 new Sparkcontext는 작동하지 않는다.

 SparkContext

textFile로 읽을 텍스트 파일을 설정했다. u.data 파일 형식은 아래와 같다.
196 242 3   881250949
186 302 3   891717742
22  377 1   878887116
244 51  2   880606923
166 346 1   886397596                                    
298 474 4   884182806
115 265 2   881171488
......
관심있는 값은 3번째 필드에 있는 "평점"이다. 각 라인을 읽어서 whitespace 로 나눈 다음 배열의 두번째 값을 읽은 다음 이를 카운팅 하면 된다.
ratings = lines.map(lambda x: x.split()[2])
result = ratings.countByValue()
코드에 사용된 람다함수는 파라미터로 받은 문자열 "x"를 split() 메서드를 이용해서 나눈 다음 배열의 두번째 값(평점)을 반환한다. map은 가장 보편적인 transformation 함수다. map 함수는 sub 함수를 전달 할 수 있다. 입력된 모든 데이터에 대해서 sub 함수가 적용된다.

Spark는 TransformationAction 두가지 단계로 구성된다.
  • 1 단계 (Transformation) : 수행하고 싶은 operation으로 RDD를 transformation 한다.
  • 2 단계 (Action) : 선언된 RDD의 결과 값을 출력한다. 출력을 하기 위해서 collect, countByValue(), take() 등등의 함수를 사용 할 수 있다.
앞서 map으로 Transformation을 했는데, 그 결과를 countByValue로 수행해서 출력한다. 이 과정은 아래와 같이 묘사 할 수 있다.

 map & countByValue

pySpark 코드를 실행해 보자. spark-submit 스크립트는 응용 프로그램을 클러스터에서 제출하기 위해서 사용한다.
# spark-submit ratings-counter.py
20/03/16 00:43:31 WARN Utils: Your hostname, yundream resolves to a loopback address: 127.0.1.1; using 192.168.35.163 instead (on interface wlx002666491c41)
20/03/16 00:43:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/03/16 00:43:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/03/16 00:43:32 INFO SparkContext: Running Spark version 2.4.5
20/03/16 00:43:32 INFO SparkContext: Submitted application: RatingsHistogram
20/03/16 00:43:32 INFO SecurityManager: Changing view acls to: yundream
20/03/16 00:43:32 INFO SecurityManager: Changing modify acls to: yundream
20/03/16 00:43:32 INFO SecurityManager: Changing view acls groups to: 
20/03/16 00:43:32 INFO SecurityManager: Changing modify acls groups to: 
20/03/16 00:43:32 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(yundream); groups with view permissions: Set(); users  with modify permissions: Set(yundream); groups with modify permissions: Set()
20/03/16 00:43:32 INFO Utils: Successfully started service 'sparkDriver' on port 45371.
20/03/16 00:43:32 INFO SparkEnv: Registering MapOutputTracker
20/03/16 00:43:32 INFO SparkEnv: Registering BlockManagerMaster
20/03/16 00:43:32 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
......
......
20/03/16 00:43:34 INFO DAGScheduler: ResultStage 0 (countByValue at /home/yundream/workspace/spark/ml-100k/ratings-counter.py:9) finished in 0.503 s
20/03/16 00:43:34 INFO DAGScheduler: Job 0 finished: countByValue at /home/yundream/workspace/spark/ml-100k/ratings-counter.py:9, took 0.533288 s
1 6110
2 11370
3 27145
4 34174
5 21201
......
......
각 평점별 집계결과를 확인 할 수 있다.