Spark Streaming을 이용한 실습을 찾아보던 중 twitter API를 이용하여 실시간 데이터를 받아와 Spark 로 해시태그를 분석해보는 예제를 찾아볼 수 있었다.
찾은 예제들은 Twitter API version 1을 이용하여 것들이 대부분이라 twitter 개발자 github예제의 twitter api version2의 코드를 참고하여 재구성하였다.
또한 spark 분석 코드를 변형하여 콘솔에 해시태그 집계 결과가 출력되도록 바꾸어 실습을 진행하였다.
📜 Spark Streaming으로 Twitter API의 해시태그 분석하기 : Twitter에서 kpop으로 필터링 한 데이터를 실시간으로 가져와 해시태그(#) 분석해보기
실습 결과 미리 보기
A. 준비단계
docker hub에서 실습 도커 이미지를 다운로드 한다.
A-1. 도커 이미지 설명
Spark와 Hadoop 설치 및 환경변수 설정 완료
Spark streaming을 위한 파이썬 파일들
: read_twitter.py 와 spark_twitter.py
A-2. 실습 파일 설명
(1) 구성
- read_twitter.py : twitter api에 연결하여 특정 카테고리 정보가 달린 트윗들을 socket으로 보냄 (like 데이터 보내는 서버)
- spark_twitter.py : read_twitter와 spark streaming의 socket과 연결됨. socket으로 받은 트윗 데이터들 중 sql 문을 이용해 해시태그만 골라와 테이블에 저장함 (2초 간격) (like 데이터 받아 분석하는 클라이언트)
(2) read_twitter.py 코드 리뷰
📌 Twitter API v2 를 이용해서 트윗 필터링한 결과 가져와, socket 연결로 spark 처리하는 client 소켓에 데이터 보내기
코드
import requests
import os
import json
import socket
# Twitter API Token인 bearer_token 불러오기
# export 'BEARER_TOKEN'='<your_bearer_token>' 필요
bearer_token = os.environ.get("BEARER_TOKEN")
# bearer_token 인증
def bearer_oauth(r):
"""
Method required by bearer token authentication.
"""
r.headers["Authorization"] = f"Bearer {bearer_token}"
r.headers["User-Agent"] = "v2FilteredStreamPython"
return r
# 스트림에 포함된 단일 혹은 규칙 목록을 반환함
def get_rules():
response = requests.get(
"https://api.twitter.com/2/tweets/search/stream/rules", auth=bearer_oauth
)
if response.status_code != 200:
raise Exception( "Cannot get rules (HTTP {}): {}".format(response.status_code, response.text))
print("|-------- get_rules --------|")
print(json.dumps(response.json()))
return response.json()
# 스트림에서 규칙 삭제
def delete_all_rules(rules):
if rules is None or "data" not in rules:
return None
ids = list(map(lambda rule: rule["id"], rules["data"]))
payload = {"delete": {"ids": ids}}
response = requests.post("https://api.twitter.com/2/tweets/search/stream/rules",auth=bearer_oauth, json=payload)
if response.status_code != 200:
raise Exception("Cannot delete rules (HTTP {}): {}".format(response.status_code, response.text)
print("|-------- delete_all_rules --------|")
print(json.dumps(response.json()))
# 규칙 삭제된 스트림에 다시 규칙 추가
def set_rules(delete):
# You can adjust the rules if needed
sample_rules = [{"value": "kpop has:images", "tag": "kpop"},]
payload = {"add": sample_rules}
response = requests.post("https://api.twitter.com/2/tweets/search/stream/rules",
auth=bearer_oauth,json=payload,)
if response.status_code != 201:
raise Exception("Cannot add rules (HTTP {}): {}".format(response.status_code, response.text) )
print("|-------- set_rules --------|")
print(json.dumps(response.json()))
# 스트림 조회 결과 불러오기
def get_stream(set, client_socket):
response = requests.get("https://api.twitter.com/2/tweets/search/stream", auth=bearer_oauth, stream=True,)
print(response.status_code)
if response.status_code != 200:
raise Exception("Cannot get stream (HTTP {}): {}".format(response.status_code, response.text ))
print("|-------- get_stream --------|")
for response_line in response.iter_lines():
if response_line:
try:
json_response = json.loads(response_line)
print("\n\nOrigianl : " , json.dumps(json_response, indent=4, sort_keys=True))
msg = json.loads(response_line)
print("\n\nSend :" , msg['data']['text'].encode('utf-8') )
client_socket.send( msg['data']['text'].encode('utf-8') )
except BaseException as e :
print("Error on get_stream() : %s" %str(e))
# twitter API 연결 및 소켓 데이터 전송
def sendData(client_socket):
rules = get_rules()
delete = delete_all_rules(rules)
set = set_rules(delete)
get_stream(set, client_socket)
# main 문 - 소켓 연결 후 twitter API로 데이터 조회에 소켓에 데이터 전송함
if __name__ == "__main__":
server_socket = socket.socket() # 소켓 오브젝트 생성
host = "127.0.0.1"
port = 5050
server_socket.bind((host, port)) # 소켓에 ip주소와 포트 연결 (bind)
print("Listening on port: %s" % str(port))
server_socket.listen(5) # 클라이언트 소켓 연결 대기 (listen)
client_socket, addr = server_socket.accept() # 클라이언트 연결 승인 (accept)
print("Received request from: " + str(addr))
sendData(client_socket) # 클라이언트 소켓을 sendData -> get_stream()함수로 넘겨 데이터 send하도록 함
A. 메인 코드 설명 :
📌 클라이언트 소켓과 연결 후 twitter API로 데이터 필터링 한 결과를 소켓으로 전송함
B. 트위터 코드 설명 - Filtered stream :
공개된 트윗의 실시간 스트림을 필터링해서 가져옴.
주요 부분 - set_rules() 함수 중 sample_rules
- sample_rules는 필터링 룰을 작성한 json 파일로, 'twitter data' has:mentions (has:media OR has:links) (의미: 멘션된 ‘twitter data’ 를 필터링한다) 형식으로 작성하면 된다.
sample_rules = [{"value": "kpop has:images", "tag": "kpop"},]
# kpop 태그와, 이미지를 가진 kpop 트윗을 필터링함
자세한 룰 설명은 아래 링크에 첨부됨
https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/build-a-rule
(3) read_twitter.py 코드 리뷰
코드
import os
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import col, split
spark = SparkSession.builder.appName("StructuredTwitterAnalysis").getOrCreate()
# 1. 입력 소스 지정
tweet_df = spark.readStream.format("socket").option("host", "127.0.0.1").option("port", 5050).load()
# 2. 작업 지정
tweet_df_string = tweet_df.selectExpr("CAST(value AS STRING)")
tweets_tab = tweet_df_string.withColumn('word', explode(split(col('value'), ' '))).groupBy('word').count().sort('count', ascending=False).filter(col('word').contains('#'))
# 3. 스트리밍 쿼리 시작 및 출력 모드 지정
writeTweet = tweets_tab.writeStream.outputMode("complete").format("console").queryName("tweetquery").trigger(processingTime='2 seconds').start().awaitTermination()
print("----- streaming is running -------")
# 결과 확인
spark.sql("select * from tweetquery").show()
코드 설명
Spark Streaming 기본 구조
💡 1. 입력 소스 지정 → 2. 쿼리 작업 지정 → 3. 스트리밍 쿼리 시작 및 출력 모드 등 지정
- 기본 과정
spark 작업을 위해 sparkSession 생성spark = SparkSession.builder.appName("StructuredTwitterAnalysis").getOrCreate()
1. 입력소스 지정
# 1
tweet_df = spark.readStream.format("socket").option("host", "127.0.0.1")
.option("port", 5050).load()
- readStream: 입력받은 스트리밍 소스를 streaming dataset 및 streaming dataframe로 생성함
- format("socket"): 소켓 연결에서 UTF 텍스트 데이터를 읽어옴.
- 수신하는 소켓은 spark 구조에서 ‘드라이버’에 위치함
- format("socket"): 소켓 연결에서 UTF 텍스트 데이터를 읽어옴.
📌 127.0.0.1의 5050포트에 연결하여 스트리밍 데이터를 한 줄씩 읽어와 dataframe을 생성한다.
2. 쿼리 작업 지정
# 1
tweet_df_string = tweet_df.selectExpr("CAST(value AS STRING)")
# 2
tweets_tab = tweet_df_string.withColumn('word', explode(split(col('value'), ' ') ))
.groupBy('word')
.count()
.sort('count', ascending=False)
.filter(col('word').contains('#'))
- selectExpr(표현식)
- CAST(value AS STRING): value 칼럼을 string형으로 형변환
- : dataframe에 SQL 사용할 수 있도록 함
- [ 1차 ] value 칼럼을 공백으로 split하여 word 칼럼으로 추가 withColumn(칼럼명, 칼럼) : dataframe에 새 칼럼을 추가하거나, 한 칼럼의 값을 변경할 때 사용
- explode(칼럼) : array나 map을 새 행으로 반환해줌
- split(칼럼, 구분자) : 문자열 칼럼을 ‘구분자’(ex. 공백, 쉼표 등) 를 기준으로 나누어 array 를 리턴함
- groupBy(칼럼).count() : (groupBy) 지정한 칼럼을 기준으로 데이터를 묶고, (count) groupBy로 묶은 데이터의 행 수를 카운트함
- sort('count', ascending=False) : 지정한 칼럼을 정렬함
- filter(조건식) : 조건에 맞는 데이터만 불러옴
- explode(칼럼) : array나 map을 새 행으로 반환해줌
📌 value 칼럼을 string 형변환 후, 공백을 기준으로 워드를 split하여 해시태그(#)만 필터를 걸어 그룹화한다.
3. 스트리밍 쿼리 시작 및 출력 모드 등 지정
writeTweet = tweets_tab.writeStream.outputMode("complete")
.format("console")
.queryName("tweetquery")
.trigger(processingTime='2 seconds')
.start()
.awaitTermination()
- writeStream : 스트리밍 쿼리 계산 출력을 위한 함수
- .outputMode("complete"): 출력 모드 3가지 중 complete 모드를 사용하여 모든 트리거 이후 전체 결과 테이블이 출력되도록 함
- .trigger(processingTime='2 seconds') : 2초 간격으로 결과를 출력함
- .queryName("tweetquery") : 쿼리 이름 지정
- .start() : 스트리밍 계산 시작
- .awaitTermination() : 쿼리 실행 중에 드라이버 프로세스가 종료되지 않도록 막음
📌 2초 간격으로 콘솔에 결과를 출력하도록 트리거와 출력모드를 설정한 후 스트리밍 계산을 시작한다.
4. 결과 확인
# 결과 확인
spark.sql("select * from tweetquery").show()
위에서 지정한 쿼리 이름 ’tweetquery'으로 집계된 결과를 조회한다.
코드를 짰으니 직접 실행해보자.
다음편에서 계속
참고
(Twitter API 연결 파트) Twitter API v2 공식 github
Twitter-API-v2-sample-code/filtered_stream.py at main · twitterdev/Twitter-API-v2-sample-code
(Spark 스트리밍 파트) structured-spark streaming
Easy to Play with Twitter Data Using Spark Structured Streaming
(소켓 연결 파트) dstream-spark-streaming
python-spark-streaming/TweetRead.ipynb at master · jleetutorial/python-spark-streaming
'#️⃣ Data Engineering > Spark' 카테고리의 다른 글
Colab으로 AWS S3연결하여 Spark작업하기 (0) | 2023.05.11 |
---|---|
[Spark] Spark Streaming - Twitter 해시태그 분석 (2) (0) | 2022.10.07 |
[Spark] 이벤트시간 윈도우, 워터마크 (0) | 2022.10.06 |
[Spark] Spark란 + 실습 (추가 정리) (0) | 2022.10.06 |
[Spark] Ubuntu 21.04에 Spark 설치하기 (0) | 2022.03.14 |