Apache Spark에 대한 자세한 설명은 저번 글에서 설명한 적이 있다.
오늘은 강의를 들으며 배운 Spark를 정리하도록 하겠다.
구성은 Spark에 대한 간단한 설명과 Hortonworks로 진행한 실습을 정리한다.
[ 목차 ]
https://hyem207.tistory.com/24
Spark
01 | Spark란 ?
Spark란
" A fast and general engine for large-scael data processing"
- 대규모 데이터를 처리하는 빠른 엔진
자바, 스칼라, 파이썬 등의 프로그래밍 언어 지원하여 스크립트 작성함
스파크는 머신러닝 과 같은 복잡한 작업도 가능하며, 데이터 마이닝, 그래프 분석, 스트리밍 데이터도 다룰 수 있다.
무엇보다도 속도도 빠르고 확장 가능한 강력한 프레임워크다.
"맵 리듀스와 비교시, 메모리 내에서 작동시 100배, 디스크로 접근 시에는 10배 더 빨라짐"
"또한 맵리듀스는 제한적이지만, 스파크는 좀 더 유연하게 사용 가능함"
"핫한 기술"
- Amazon, Ebay, Yahoo 등에서 사용
"데이터 세트를 나타내는 오브젝트인 RDD(Reilient Distributed Dataset)가 주 개념이다. "
- 새로운 RDD를 만들거나 수정/삭제/분석을 위해 RDD에서 여러 함수를 불러올 수 있다.
스파크 구성 요소
SPARK CORE를 중점/기반으로 Spark Streaming, Spark SQL, MLLib, GraphX가 있음
- Spark Streaming : 실시간 인풋 데이터를 분석함. ex. 실시간으로 로그를 수집하는 서버로부터 분석을 진행함.
- SQL 쿼리를 만들고, 스파크의 데이터 세트에 집중함
- MLLib : 머신러닝과 데이터 마이닝의 도구를 사용가능함.
- GraphX : 그래프로부터 데이터를 추출가능하도록 함 ?
RDD
Spark에서 발생하는 모든 것의 추상화로, 클러스터 전체에 걸쳐 작업이 고르게 분산되고, 오류에 탄력적으로 대처할 수 있도록 함
즉, RDD는 클러스터에서 자동으로 올바른 작업을 수행할 수 있는 객체의 키와 값 정보 or 일반적인 정보를 저장하는 일종의 방법이다. (프로그래밍 과점에서 RDD는 단순히 데이터 세트이다.)
Spark Context
드라이버 프로그램에서 만들어지는 것으로, spark shell이 있고, SC라는 객체를 만들 수 있다.
즉, 드라이버 프로그램이 Spark 내에서 실행되는 일종의 환경으로, 이것이? RDD를 만든다.
02 | 실습 준비
실습 설정을 위해 Hortonworks 가상머신을 키고, Ambari에 접속하여 admin 계정으로 로그인한다. (pw : admin)
※ 참고 : Ambari의 admin 계정을 잃어버려 재설정하기 원한다면, putty로 로그인하여 root 계정으로 들어간 뒤, ambari-admin-password-reset 명령어를 입력하면 된다.
실습에 앞서 스크립트 로그 설정을 바꿔준다.
: Spark2 > Config > log4j -properties > Category를 ERROR로 변경
그리고 Save 버튼 눌러 저장하고 restart 해준다.
실습에 사용되는 데이터 셋이 hadoop에 저장되어 있음을 file view로 확인한다.
03 | 실습
실습에 앞서 wget 해오는 곳은 강사님의 따로 올려주신 url로, 이 곳에서 데이터 셋과 코드를 다운로드 하였다.
해당 실습은 영화 평점 데이터 셋에서 가장 낮은 평점을 받은 영화들을 출력하는 것이다.
먼저 putty로 클러스터 접근하자.
1. 분석할 데이터 셋인 u.item 다운로드 하기
$ mkdir ml-100k
$ cd ml-100k
$ wget http://media.sundog-soft.com/hadoop/ml-100k/u.item
2. 코드 스크립트 다운르드하기
$ wget http://media.sundog-soft.com/hadoop/Spark.zip
$ unzip Spark.zip
less 명령어로 실행할 스크립트 내용이 맞는지 확인한다.
$ less LowestRatedMovieSpark.py
from pyspark import SparkConf, SparkContext
# This function just creates a Python "dictionary" we can later
# use to convert movie ID's to movie names while printing out
# the final results.
def loadMovieNames():
movieNames = {}
with open("ml-100k/u.item") as f:
for line in f:
fields = line.split('|')
movieNames[int(fields[0])] = fields[1]
return movieNames
# Take each line of u.data and convert it to (movieID, (rating, 1.0))
# This way we can then add up all the ratings for each movie, and
# the total number of ratings for each movie (which lets us compute the average)
def parseInput(line):
fields = line.split()
return (int(fields[1]), (float(fields[2]), 1.0))
if __name__ == "__main__":
# The main script - create our SparkContext
conf = SparkConf().setAppName("WorstMovies")
# Load up our movie ID -> movie name lookup table
movieNames = loadMovieNames()
# Load up the raw u.data file
lines = sc.textFile("hdfs:///user/maria_dev/ml-100k/u.data")
# Convert to (movieID, (rating, 1.0))
movieRatings = lines.map(parseInput)
# Reduce to (movieID, (sumOfRatings, totalRatings))
ratingTotalsAndCount = movieRatings.reduceByKey(lambda movie1, movie2: ( movie1[0] + movie2[0], movie1[1] + movie2[1] ) )
# Map to (rating, averageRating)
averageRatings = ratingTotalsAndCount.mapValues(lambda totalAndCount : totalAndCount[0] / totalAndCount[1])
# Sort by average rating
sortedMovies = averageRatings.sortBy(lambda x: x[1])
# Take the top 10 results
results = sortedMovies.take(10)
# Print them out:
for result in results:
print(movieNames[result[0]], result[1])
3. 스파크 스크립트 실행하기
spark-submit으로 스파크 환경을 설정하고 스파크 스크립트를 실행시킨다.
$ spark-submit LowestRatedMovieSpark.py
가장 낮은 평점을 받은 영화를 출력한 결과이다.
Spark SQL
01 | SparkSQL이란
스파크는 행으로 구성된 RDD를 데이터 프레임 객체로 확장하고 데이터 프레임으로 작업을 한다.,
데이터 프레임은
- Row 객체를 포함
- sql 쿼리 사용
- 스키마를 갖고 있음
- JSON, Hive, Parquet으로 읽고 쓸 수 있음
- JDBC와 커뮤니케이션 가능함
Spark 2.0 에서는 Dataframe과 상반되는 Dataset이 있음
데이터 프레임은 행 객체의 데이터 세트이고 데이터 세트는 일반적인 용어 (행뿐만 아니라 모든 정보를 포함할 수 있음)
- 런타임동안 발생하는 컴파일 시간 오류시간을 제공함? 는 점에서 더 장점임
- 파이썬에서는 별로 차이 없는 데 스칼라와 자바에서는 차이가 존재하기에 알아둬야 함
- 데이터터베이스 서버를 실행하여 연결해서 쿼리를 실행할 수있음
- 확장성. 사용자 정의 함수를 만들 수 있음 . SQL에 연결하고 자신만의 함수를 만들 수 있음. SQL 쿼리에서 사용가능함
또한 스파크 2에서 머신 러닝 라이브러리나 스파크 스트리밍 라이브러리, API를 근간으로 하는 데이터 세트를 가지고 있습니다. (데이터 세트는 다른 시스템 사이에서 데이터를 보내는 공통분모입니다.)
스파크 2에서 데이터 세트를 이용한 작업 이점뿐 아니라 스파크를 기반으로 구축된 다른 기능을 사용하는 더 쉬운 방법을 얻어 흥미로운 방식으로 혼합하고 일치시킬 수 있습니다.
02 | 실습 전_코드 리뷰
spark2의 dataframe을 이용한 실습 / 평점이 낮은 영화들을 찾는 실습
# LowestRatedMovieDataFrame.py
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions
def loadMovieNames():
movieNames = {}
with open("ml-100k/u.item") as f:
for line in f:
fields = line.split('|')
movieNames[int(fields[0])] = fields[1]
return movieNames
def parseInput(line):
fields = line.split()
return Row(movieID = int(fields[1]), rating = float(fields[2]))
if __name__ == "__main__":
# Create a SparkSession (the config bit is only for Windows!)
# Spark 2.0 문법으로, spark 세션을 생성함
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()
# Load up our movie ID -> name dictionary
movieNames = loadMovieNames()
# Get the raw data
# 새로운 RDD로 업로드
lines = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.data")
# Convert it to a RDD of Row objects with (movieID, rating)
movies = lines.map(parseInput)
# Convert that to a DataFrame
# RDD를 데이터프레임으로 변환
movieDataset = spark.createDataFrame(movies)
# Compute average rating for each movieID -> 영화 ID와 rating 칼럼으로 구성된 데이터 셋
averageRatings = movieDataset.groupBy("movieID").avg("rating")
# Compute count of ratings for each movieID
counts = movieDataset.groupBy("movieID").count()
# Join the two together (We now have movieID, avg(rating), and count columns)
averagesAndCounts = counts.join(averageRatings, "movieID")
# Pull the top 10 results
topTen = averagesAndCounts.orderBy("avg(rating)").take(10)
# Print them out, converting movie ID's to names as we go.
for movie in topTen:
print (movieNames[movie[0]], movie[1], movie[2])
# Stop the session
spark.stop()
IMPORT
- SparkSession : Spark 2.0에서 쓰이는 것으로 Spark와 SQL 세션을 제공함.
SPARK에서 Spark 세션 개체를 생성하고 스크립트를 통해 세션을 실행시킨 후 SQL 쿼리를 실행할 수 있으며,원하는 모든 작업을 완료하고 중단하면 종료됩니다. - Row : 행 오브젝트의 데이터 프레임과 SQL 함수를 만들 수 있습니다.
- Function : 평균같은 것을 계산할 수있음
02 | 실습
실습에 앞서 스파크 스크립트와 분석할 데이터 셋이 있는지 확인한다.
Spark2 를 사용하기 위해 환경변수를 설정한다.
$ export SPARK_MAJOR_VERSION=2
spark-submit 명령어로 스크립트를 실행한다.
$ spark-submit LowestRatedMovieDataFrame.py
MLLib 이용한 영화 추천 실습
01 | 실습 전 _ 코드 리뷰
전체 userId, rating등의 칼럼으로 이루어진 데이터셋을 이용하여 특정 사용자의 영화 rating을 예측해보자.
해당 예제는 userId가 0인 사용자의 평점 데이터셋으로 모델을 학습한 후, 전체적으로 100번 이상 rating된 영화를 user 0의 경우 어떻게 평가할 지 예측하는 예제이다.
참고로 userID가 0인 데이터는 원본 데이터셋에서 따로 추가한 것으로, 실습을 위해선 u.data 파일에 데이터를 추가해주어야 한다. (아래 실습에 과정 첨부함)
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import lit
# lit : 행들에 상수를 넣을 수 있음
# Load up movie ID -> movie name dictionary
def loadMovieNames():
movieNames = {}
with open("ml-100k/u.item") as f:
for line in f:
fields = line.split('|')
movieNames[int(fields[0])] = fields[1].decode('ascii', 'ignore')
return movieNames
# Convert u.data lines into (userID, movieID, rating) rows
def parseInput(line):
fields = line.value.split()
return Row(userID = int(fields[0]), movieID = int(fields[1]), rating = float(fields[2]))
if __name__ == "__main__":
# Create a SparkSession (the config bit is only for Windows!)
spark = SparkSession.builder.appName("MovieRecs").getOrCreate()
# This line is necessary on HDP 2.6.5:
spark.conf.set("spark.sql.crossJoin.enabled", "true")
# Load up our movie ID -> name dictionary
movieNames = loadMovieNames()
# Get the raw data
# 앞에서는 sparkContext를 이용해서 가져왔지만, 이 예제는 spark.read.text 이용함
# return값이 dataframe이라 .rdd로 RDD데이터로 만든다.
lines = spark.read.text("hdfs:///user/maria_dev/ml-100k/u.data").rdd
# Convert it to a RDD of Row objects with (userID, movieID, rating)
ratingsRDD = lines.map(parseInput)
# Convert to a DataFrame and cache it
# Dataframe으로 바꾸고, 이 Dataframe을 한 번 이상 사용하기 위해 cache()를 이용한다.
ratings = spark.createDataFrame(ratingsRDD).cache()
# Create an ALS collaborative filtering model from the complete data set
# ALS를 train/test의 머신러닝 개념임.
als = ALS(maxIter=5, regParam=0.01, userCol="userID", itemCol="movieID", ratingCol="rating")
model = als.fit(ratings)
# Print out ratings from user 0:
# userID가 0인 평점만 추출하여, userRatings 데이터 프레임에 저장하고,
# for문으로 결과를 출력한다.
print("\nRatings for user ID 0:")
userRatings = ratings.filter("userID = 0")
for rating in userRatings.collect():
print movieNames[rating['movieID']], rating['rating']
# 100번 이상 레이팅 된 영화만을 기반으로, 20개의 추천 영화를 뽑는다.
print("\nTop 20 recommendations:")
# Find movies rated more than 100 times
ratingCounts = ratings.groupBy("movieID").count().filter("count > 100")
# Construct a "test" dataframe for user 0 with every movie rated more than 100 times
# 이때 2개의 칼럼을 가진 새 데이터 프레임을 뽑는데, 하나는 movieID이고, 다른 하나는 userId가 0인것으로, user 0일 경우 어떻게 평가할지를 추측한다.
popularMovies = ratingCounts.select("movieID").withColumn('userID', lit(0))
# Run our model on that list of popular movies for user ID 0
# 데이터 프레임을 model에 전달
recommendations = model.transform(popularMovies)
# Get the top 20 movies with the highest predicted rating for this user
# 평점을 내림차순으로 정렬하여 최고 20개만 남기고 가져온다.
topRecommendations = recommendations.sort(recommendations.prediction.desc()).take(20)
# 출력
for recommendation in topRecommendations:
print (movieNames[recommendation['movieID']], recommendation['prediction'])
spark.stop()
02 | 실습
실습을 위해 userID가 0인 데이터 원본 데이터셋(u.data)에 따로 추가하자. (위에서 3줄 추가함)
그런 다음 수정한 데이터 셋을 Hadoop에 올린다.
MLLib을 위해서는 numpy가 필요하다. pip명령어로 1.16 버전을 다운로드한다.
sudo pip install numpy==1.16
spark 스크립트가 있는지 확인하고, Spark2 를 사용하기 위해 환경변수를 설정한다.
spark-submit 명령어로 스크립트를 실행한다.
spark-submit MovieRecommendationsALS.py
결과를 보면 위 쪽에 user 0이 기존에 평가한 영화와 평점이 띄워지고, 아래는 top 20인 영화를 기준으로 추천된 영화이다.
다만 이미 평가한 영화들은 걸러지지 않고 나왔다. 이를 개선해보자.
...
LowestRatedMovieDataFrame.py (개선)
가장 낮은 평점의 영화를 찾는데, 최소 10명 이상이 평가를 준 영화만을 분석 대상으로 한다.
(이유 : 한 사람만의 낮은 평점을 줬다고 해서 그 영화의 전체 평점을 낮출 순 없음.)
'#️⃣ Data Engineering > Spark' 카테고리의 다른 글
[Spark] Spark Streaming - Twitter 해시태그 분석 (1) (0) | 2022.10.07 |
---|---|
[Spark] 이벤트시간 윈도우, 워터마크 (0) | 2022.10.06 |
[Spark] Spark란 + 실습 (추가 정리) (0) | 2022.10.06 |
[Spark] Ubuntu 21.04에 Spark 설치하기 (0) | 2022.03.14 |
[스파크 완벽 가이드] CH.1_아파치 스파크란 (0) | 2022.02.06 |