CDH5에서 Spark 애플리케이션을 실행하는 방법
Maven 프로젝트 생성
일반 Maven 프로젝트를 생성하려면 다음 명령을 사용하십시오:
bash
$ mvn archetype:generate - DgroupId= com.cloudera.sparkwordcount -DartifactId=sparkwordcount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
sparkwordcount 디렉터리 이름을 simplesparkapp으로 바꾼 다음 simplesparkapp 디렉터리 아래에 scala 소스 파일 디렉터리를 추가합니다.
p>bash
$ mkdir -p Sparkwordcount/src/main/scala/com/cloudera/sparkwordcount
스칼라와 스파크를 추가하도록 pom.xml을 수정하세요. 종속성:
xml
<종속성>
<종속성>
<종속성>
< version>1.2.0 -cdh5.3.0
Scala를 컴파일하기 위한 플러그인 추가: p>
xml
<실행>
<실행>
<목표>
<목표>컴파일
< /plugin>
Scala 컴파일 플러그인에 필요한 저장소를 추가하세요:
xml
< url>/content/repositories/releases /
repositories>
마지막으로 전체 pom.xml 파일은 /javachen/simplesparkapp/blob/master/pom.xml에서 찾을 수 있습니다.
다음 명령을 실행하여 프로젝트가 성공적으로 컴파일될 수 있는지 확인하세요:
bash
mvn package
샘플 코드 작성
p>WordCount를 예로 들어 보겠습니다. 프로그램은 다음 논리를 완료해야 합니다.
입력 파일 읽기
각 단어의 발생 횟수를 계산합니다.
특정 횟수보다 적은 단어 필터링 단어
나머지 단어에서 각 문자의 발생 횟수 계산
MapReduce에서 위의 논리에는 두 개의 MapReduce 작업이 필요합니다. 하지만 Spark에서는 단 하나의 간단한 작업만 필요하며, 코드 양이 90% 감소합니다.
Scala 프로그램을 다음과 같이 작성하세요:
scala
import org.apache.spark.SparkContext
import org.apache.spark .SparkContext ._
import org.apache.spark.SparkConf
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf().setAppName("Spark Count"))
val Threshold = args(1).toInt
// 각각 분할 문서를 단어로 변환
val tokenized = sc.textFile(args(0)). flatMap(_.split(" "))
// 각 단어의 발생 횟수를 계산합니다. p>
val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)
// 발생 횟수가 임계값 미만인 단어를 필터링합니다.
val filtered = wordCounts.filter(_._2 >= 임계값)
// 문자 수 계산
val charCounts =filtered.FlatMap(_._1.toCharArray).map((_, 1) ).reduceByKey(_ + _)
System.out.println(charCounts.collect().mkString(", "))
charCounts.saveAsTextFile("world- count- result")
}
}
Spark는 지연 실행 전략을 사용합니다. 즉, 작업이 실행될 때만 변환이 실행됩니다. 위 예의 작업 작업은 Collect 및 saveAsTextFile이며, 전자는 데이터를 클라이언트에 푸시하는 것이고 후자는 데이터를 HDFS에 저장하는 것입니다.
비교를 위해 프로그램의 Java 버전은 다음과 같습니다:
java
import java.util.ArrayList;
import java.util.배열;
org.apache.spark.api.java.* 가져오기;
org.apache.spark.api.java.function.*;
가져오기 p>import org.apache.spark.SparkConf;
import scala.Tuple2;
public class JavaWordCount {
public static void main(String [] args) {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"));
최종 int 임계값 = Integer.parseInt(args[1 ]);
// 각 문서를 단어로 분할
JavaRDD tokenized = sc.textFile(args[0]). flatMap(
new FlatMapFunction() {
public Iterable call(String s) {
return Arrays.asList(s.split(" "));
}
}
);
// 각 단어의 발생 횟수를 계산합니다.
JavaPairRDD counts = tokenized.mapToPair(
new pairFunction () {
public Tuple2 call(String s) {
return new Tuple2(s, 1);
}
}
).reduceByKey(
new Function2() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
}
);
또한 프로그램의 Python 버전은 다음과 같습니다. p>
python
sys 가져오기
pyspark에서 SparkContext 가져오기
file="inputfile.txt"
count=2
if __name__ == "__main__":
sc = SparkContext(appName="PythonWordCount")
line = sc.textFile(file, 1)
개수 = 라인 .FlatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda a, b: a + b) \
.filter(lambda (a, b) : b >= 개수) \
.FlatMap(lambda (a , b): 목록(a)) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda a, b: a + b)
print ", ".join(str(t) for t in counts.collect())
sc.stop()
컴파일
jar을 생성하려면 다음 명령을 실행하세요:
bash
$ mvn package
작업이 성공적으로 완료되면 Sparkwordcount-0.0.1-SNAPSHOT.jar 파일이 대상 디렉터리에 생성됩니다.
실행
프로젝트가 의존하는 스파크 버전은 1.2.0-cdh5.3.0이므로 다음 명령은 CDH 5.3 클러스터에서만 실행할 수 있습니다.
먼저 테스트 파일 inputfile.txt를 HDFS에 업로드하세요.
bash
$ wget /javachen/simplesparkapp/blob/master/data/inputfile.
$ hadoop fs -put inputfile.txt
두 번째로, Sparkwordcount-0.0.1-SNAPSHOT.jar을 클러스터의 노드에 업로드한 다음, Spark-submit 스크립트를 사용하십시오. 프로그램의 Scala 버전:
bash
$ Spark-submit --class com.cloudera.sparkwordcount.SparkWordCount --master local Sparkwordcount-0.0.1-SNAPSHOT.jar inputfile .txt 2
또는 Java 버전의 프로그램을 실행하세요:
bash
$ Spark-submit --class com.cloudera.sparkwordcount.JavaWordCount - -master local Sparkwordcount-0.0.1-SNAPSHOT.jar inputfile.txt 2
Python 버전 프로그램의 경우 실행 스크립트는 다음과 같습니다.
bash
$ Spark-submit - -master local PythonWordCount.py
클러스터가 독립형 모드로 배포된 경우 마스터 매개변수 값을 Spark://
Python 버전 프로그램의 최종 출력은 다음과 같습니다:
(u'a', 4),(u'c', 1),(u'b ', 1 ),(u'e', 6),(u'f', 1),(u'i', 1),(u'h', 1),(u'l', 1), (u' o', 2),(u'n', 4),(u'p', 2),(u'r', 2),(u'u', 1),(u't' , 2) ,(u'v', 1)