spark 활용해 보기

우리는 저번 포스팅에 스팍을 설치하고 간단하게 알아봤다.

이번시간에는 간단하게 활용을 해서 해보겠다. 그리고 스칼라도 조금알아야 된다. 필자도 조금밖에 모른다. 물론 자바를 알아도 되긴 하지만 스팍은 스칼라로 해야 뭔가 한듯하다. 자바로하면 소스도 길고 보기도 별루 안좋다.

http://grouplens.org/datasets/movielens

위의 사이트의 가면 영화 데이터를 다운로드 받을 수 있다. 위의 데이터로 진행해 보겠다.
일단 스팍을 기동한 후에 다운로드 받을 파일을 적당한 곳에 압축을 푼다.

우리는 ratings.csv 파일으로 할 예정이다.
다운받을 폴더에 README.txt 를 열어 보면 아래와 같이 나와있다.

Ratings Data File Structure (ratings.csv)
-----------------------------------------

All ratings are contained in the file `ratings.csv`. Each line of this file after the header row represents one rating of one movie by one user, and has the following format:

    userId,movieId,rating,timestamp

The lines within this file are ordered first by userId, then, within user, by movieId.

Ratings are made on a 5-star scale, with half-star increments (0.5 stars - 5.0 stars).

순서는 userId, movieId, rating, timestamp 순으로 되어있다고 한다.
시작해보자

scala> val dataSet = sc.textFile("/Users/wonwoo/Downloads/ml-20m/ratings.csv")
dataSet: org.apache.spark.rdd.RDD[String] = /Users/wonwoo/Downloads/ml-20m/ratings.csv MapPartitionsRDD[3] at textFile at <console>:27

우선 파일을 읽어서 dataSet이라는 변수에 넣어두었다.

scala> dataSet.first()
res0: String = userId,movieId,rating,timestamp

위와 같이 첫번째에는 userId,movieId,rating,timestamp 라는 텍스트가 존재한다. 우리는 이것이 필요 없다. 제거 해줘야 한다.

scala> val header = dataSet.first()
header: String = userId,movieId,rating,timestamp

scala> val data = dataSet.filter(_ != header).map(_.split(","))
data: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[6] at map at <console>:31

첫번째는 헤더라는 곳에 저장해두고 헤더가 아니면 전부 data라는 변수에 넣었다.

scala> data.first()
res4: Array[String] = Array(1, 2, 3.5, 1112486027)

그리고 다시 확인해보니 헤더부분이 사라졌다.

우리는 랭킹만을 모아서 랭킹들의 개수를 찾고 싶다.

scala> val ratings = data.map(_(2))
ratings: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at map at <console>:33

이렇게 하면 랭킹만이 모아진 RDD가 된다. 3번째가 (배열은2번째) 랭킹 컬럼이므로 저렇게 했다.

scala> ratings.countByValue()
res6: scala.collection.Map[String,Long] = Map(4.5 -> 1534824, 0.5 -> 239125, 3.0 -> 4291193, 3.5 -> 2200156, 2.0 -> 1430997, 1.5 -> 279252, 4.0 -> 5561926, 1.0 -> 680732, 2.5 -> 883398, 5.0 -> 2898660)

countByValue를 하면 각각 평점별루 개수들이 보여진다.
http://spark.apache.org/docs/latest/programming-guide.html
위의 사이트를 가보면 Transformations, Actions API 들이 나와있다.
실제 자바8을 Stream을 알면 어느정도는 아는 내용이다.

우리는 아주 간단하게 스팍의 활용에 대해 알아봤다. 스칼라도 조금더 공부해야겠다.

spark 설치 및 간단하게 보자

spark을 설치 해보자
간단하게 설치하고 실행만 시켜보자.
mac 기준으로 설치를 진행하겠다. 깔기만 하면 되긴 하는데..설치할 것도 없다.
spark

위와 같은 설정으로 다운받자.
원하는 곳에 압축을 푼후 ${SPARK_HOME}/bin 으로 가서 아래와 같이 실행 시키자!

./spark-shell

spark1

그럼 위와 같이 실행 될 것이다.
이 spark-shell을 실행 시키면 sparkContext가 생성 된다. 로그를 보면 확인 할 수 있다.

Spark context available as sc.

이렇게 말이다.

그리고 webUI도 함께 올라간다.
http://localhost:4040 으로 접속 해보자!

일단 설치가 완료 되었다.
이번 시간에는 간단하게만 알아보자

scala> val textFile = sc.textFile("/Users/wonwoo/Documents/spark-1.6.1-bin-hadoop2.6/README.md")
textFile: org.apache.spark.rdd.RDD[String] = /Users/wonwoo/Documents/spark-1.6.1-bin-hadoop2.6/README.md MapPartitionsRDD[19] at textFile at <console>:27

아까 말했듯이 sc는 shell을 실행 시킬때 자동으로 올라간다.

scala> textFile.count()
res15: Long = 95

파일의 RDD의 개수이다. 지금버전의 README.md는 95줄이 있다.

scala> textFile.first()
res18: String = # Apache Spark

첫번째 RDD를 가져온다.
마지막으로 원하는 단어를 찾아보자.

scala> textFile.filter(line => line.contains("Spark")).count()
res19: Long = 17

그리고 나서 spark WEB UI에 접속해보자.
그러면 우리가 했던 작업들을 상세하게 볼 수 있다.
우리는 간단하게 스팍에 대해서 알아봤다.
다음 시간에는 좀더 재밌는 거를 해보자!