컴퓨터 지식 네트워크 - 컴퓨터 백과사전 - CDH5에서 Spark 애플리케이션을 실행하는 방법

CDH5에서 Spark 애플리케이션을 실행하는 방법

Maven 프로젝트 생성

일반 Maven 프로젝트를 생성하려면 다음 명령을 사용하십시오:

bash

$ mvn archetype:generate - DgroupId= com.cloudera.sparkwordcount -DartifactId=sparkwordcount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

sparkwordcount 디렉터리 이름을 simplesparkapp으로 바꾼 다음 simplesparkapp 디렉터리 아래에 scala 소스 파일 디렉터리를 추가합니다.

p>

bash

$ mkdir -p Sparkwordcount/src/main/scala/com/cloudera/sparkwordcount

스칼라와 스파크를 추가하도록 pom.xml을 수정하세요. 종속성:

xml

<종속성>

<종속성>

org.scala-lang

scala-library

2.10.4

<종속성>

org.apache.spark

spark-core_2.10

< version>1.2.0 -cdh5.3.0

Scala를 컴파일하기 위한 플러그인 추가:

xml

org.scala-tools

maven-scala-plugin

<실행>

<실행>

<목표>

<목표>컴파일

testCompile

< /plugin>

Scala 컴파일 플러그인에 필요한 저장소를 추가하세요:

xml

scala-tools.org

Scala-tools Maven2 저장소

< url>/content/repositories/releases /

cloudera-repos

Cloudera Repos

/artifactory/cloudera-repos/

마지막으로 전체 pom.xml 파일은 /javachen/simplesparkapp/blob/master/pom.xml에서 찾을 수 있습니다.

다음 명령을 실행하여 프로젝트가 성공적으로 컴파일될 수 있는지 확인하세요:

bash

mvn package

샘플 코드 작성

p>

WordCount를 예로 들어 보겠습니다. 프로그램은 다음 논리를 완료해야 합니다.

입력 파일 읽기

각 단어의 발생 횟수를 계산합니다.

특정 횟수보다 적은 단어 필터링 단어

나머지 단어에서 각 문자의 발생 횟수 계산

MapReduce에서 위의 논리에는 두 개의 MapReduce 작업이 필요합니다. 하지만 Spark에서는 단 하나의 간단한 작업만 필요하며, 코드 양이 90% 감소합니다.

Scala 프로그램을 다음과 같이 작성하세요:

scala

import org.apache.spark.SparkContext

import org.apache.spark .SparkContext ._

import org.apache.spark.SparkConf

object SparkWordCount {

def main(args: Array[String]) {

val sc = new SparkContext(new SparkConf().setAppName("Spark Count"))

val Threshold = args(1).toInt

// 각각 분할 문서를 단어로 변환

val tokenized = sc.textFile(args(0)). flatMap(_.split(" "))

// 각 단어의 발생 횟수를 계산합니다.

val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)

// 발생 횟수가 임계값 미만인 단어를 필터링합니다.

val filtered = wordCounts.filter(_._2 >= 임계값)

// 문자 수 계산

val charCounts =filtered.FlatMap(_._1.toCharArray).map((_, 1) ).reduceByKey(_ + _)

System.out.println(charCounts.collect().mkString(", "))

charCounts.saveAsTextFile("world- count- result")

}

}

Spark는 지연 실행 전략을 사용합니다. 즉, 작업이 실행될 때만 변환이 실행됩니다. 위 예의 작업 작업은 Collect 및 saveAsTextFile이며, 전자는 데이터를 클라이언트에 푸시하는 것이고 후자는 데이터를 HDFS에 저장하는 것입니다.

비교를 위해 프로그램의 Java 버전은 다음과 같습니다:

java

import java.util.ArrayList;

import java.util.배열;

org.apache.spark.api.java.* 가져오기;

org.apache.spark.api.java.function.*;

가져오기 p>

import org.apache.spark.SparkConf;

import scala.Tuple2;

public class JavaWordCount {

public static void main(String [] args) {

JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"));

최종 int 임계값 = Integer.parseInt(args[1 ]);

// 각 문서를 단어로 분할

JavaRDD tokenized = sc.textFile(args[0]). flatMap(

new FlatMapFunction() {

public Iterable call(String s) {

return Arrays.asList(s.split(" "));

}

}

);

// 각 단어의 발생 횟수를 계산합니다.

JavaPairRDD counts = tokenized.mapToPair(

new pairFunction () {

public Tuple2 call(String s) {

return new Tuple2(s, 1);

}

}

).reduceByKey(

new Function2() {

public Integer call(Integer i1, Integer i2) {

return i1 + i2;

}

}

);

또한 프로그램의 Python 버전은 다음과 같습니다.

python

sys 가져오기

pyspark에서 SparkContext 가져오기

file="inputfile.txt"

count=2

if __name__ == "__main__":

sc = SparkContext(appName="PythonWordCount")

line = sc.textFile(file, 1)

개수 = 라인 .FlatMap(lambda x: x.split(' ')) \

.map(lambda x: (x, 1)) \

.reduceByKey(lambda a, b: a + b) \

.filter(lambda (a, b) : b >= 개수) \

.FlatMap(lambda (a , b): 목록(a)) \

.map(lambda x: (x, 1)) \

.reduceByKey(lambda a, b: a + b)

print ", ".join(str(t) for t in counts.collect())

sc.stop()

컴파일

jar을 생성하려면 다음 명령을 실행하세요:

bash

$ mvn package

작업이 성공적으로 완료되면 Sparkwordcount-0.0.1-SNAPSHOT.jar 파일이 대상 디렉터리에 생성됩니다.

실행

프로젝트가 의존하는 스파크 버전은 1.2.0-cdh5.3.0이므로 다음 명령은 CDH 5.3 클러스터에서만 실행할 수 있습니다.

먼저 테스트 파일 inputfile.txt를 HDFS에 업로드하세요.

bash

$ wget /javachen/simplesparkapp/blob/master/data/inputfile.

$ hadoop fs -put inputfile.txt

두 번째로, Sparkwordcount-0.0.1-SNAPSHOT.jar을 클러스터의 노드에 업로드한 다음, Spark-submit 스크립트를 사용하십시오. 프로그램의 Scala 버전:

bash

$ Spark-submit --class com.cloudera.sparkwordcount.SparkWordCount --master local Sparkwordcount-0.0.1-SNAPSHOT.jar inputfile .txt 2

또는 Java 버전의 프로그램을 실행하세요:

bash

$ Spark-submit --class com.cloudera.sparkwordcount.JavaWordCount - -master local Sparkwordcount-0.0.1-SNAPSHOT.jar inputfile.txt 2

Python 버전 프로그램의 경우 실행 스크립트는 다음과 같습니다.

bash

$ Spark-submit - -master local PythonWordCount.py

클러스터가 독립형 모드로 배포된 경우 마스터 매개변수 값을 Spark://: 또는 Yarn의 모드가 실행될 수 있습니다.

Python 버전 프로그램의 최종 출력은 다음과 같습니다:

(u'a', 4),(u'c', 1),(u'b ', 1 ),(u'e', 6),(u'f', 1),(u'i', 1),(u'h', 1),(u'l', 1), (u' o', 2),(u'n', 4),(u'p', 2),(u'r', 2),(u'u', 1),(u't' , 2) ,(u'v', 1)

上篇: 심천에서 돈을 버는 법 下篇: 보증 기간 동안 노트북의 먼지를 제거하려면 청구서가 필요합니까? 얼마입니까?
관련 내용