当前访客身份:游客 [ 登录  | 注册加入尚学堂]
直播

easychart

拥有积分:4
这家伙太懒,还没有签名!

博客分类

笔记中心

课题中心

提问中心

答题中心

解答题中心

Spark 快速入门--编程及运行

easychart 发表于 2年前 (2014-10-31 20:13:50)  |  评论(0)  |  阅读次数(1489)| 0 人收藏此文章,   我要收藏   

Apache Spark 快速入门

本指南提供了使用Spark的快速介绍. 通过Spark的交互式Scala shell脚本 (不要担心你不知道Scala –对此不需要知道太多),我们将会对API有个初步认识,然后会介绍如何使用 Scala, Java, 和Python编写独立的应用程序. 查看programming guide会得到完整的参考手册.

千里之行始于足下, 此刻我们只需要在机器上构建Spark即可. 进入Spark目录然后运行:

$ sbt/sbt assembly

使用Spark Shell的交互分析

基础

Spark的交互式shell脚本提供了一条学习API的捷径,同时也是用于交互式分析数据集的强大工具。在Spark目录下运行 ./bin/spark-shell 即可进入脚本.

Spark的基本抽象是名为弹性分布式数据集(Resilient Distributed Dataset,即RDD)的分布式集合对象. RDDs 可以来自 Hadoop InputFormats (例如 HDFS files) ,或者通过其他RDDs转换得到. 让我们从Spark源文件目录的README文件建立一个新RDD:

scala> val textFile = sc.textFile("README.md")
textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3

RDDs 具有许多 actions, 这些actions可以返回values和 transformations(返回新的RDDs). 这是几个简单的actions:

scala> textFile.count() // Number of items in this RDD
res0: Long = 74

scala> textFile.first() // First item in this RDD
res1: String = # Apache Spark

现在我们来执行一个 transformation. 使用一个 filter transformation 过滤原始文件, 以返回一个包含原始文件子集的新的 RDD.

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09

也可以连起来使用 transformations 和 actions:

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

更多的RDD操作

RDD actions 和 transformations 可以用于许多复杂计算. 例如要找出一行中单词最多的行:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 16

第一步做了一个line到int数值的map, 建立了一个新的 RDD. 然后在这个RDD中调用 reduce 找到最大的行中单词数值. mapreduce的参数是 Scala 函数常量 (闭包), 其中可以使用 Scala/Java 库的语言特性. 例如, 可以调用在别处声明的函数. 使用 Math.max() 函数可以让刚才的例子变的简单和易于理解:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 16

一个常见的数据流模式是因Hadoop而流行的MapReduce. Spark可以非常简单的实现MapReduce流程:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(java.lang.String, Int)] = spark.ShuffledAggregatedRDD@71f027b8

这里,通过组合 flatMapmap 和 reduceByKey transformations 来计算文件中每个单词的数目,返回一个(String, Int) 数据对的 RDD对象 . 为了在脚本中收集单词的数量, 可以使用 collect action:

scala> wordCounts.collect()
res6: Array[(java.lang.String, Int)] = Array((need,2), ("",43), (Extra,3), (using,1), (passed,1), (etc.,1), (its,1), (`/usr/local/lib/libmesos.so`,1), (`SCALA_HOME`,1), (option,1), (these,1), (#,1), (`PATH`,,2), (200,1), (To,3),...

缓存

Spark支持将数据集加入集群范围的内存缓存. 这在数据集频繁存取的时候尤其有用, 比如要查询一个小的热点数据集或是执行像PageRank这样的迭代算法. 下面把 linesWithSpark 数据集做个缓存:

scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

用Spark显示和缓存一个30行的文本看起来有点傻. 有意思的在于可以用同样的函数处理很大的数据集, 即使他们分布在成千上万的节点上. 通过 bin/spark-shell 链接到集群后即可交互式的使用这些函数, 详情参见 programming guide.

使用Scala编写的单独程序

Now say we wanted to write a standalone application using the Spark API. We will walk through a simple application in both Scala (with SBT), Java (with Maven), and Python. If you are using other build systems, consider using the Spark assembly JAR described in the developer guide.

We’ll create a very simple Spark application in Scala. So simple, in fact, that it’s named SimpleApp.scala:

/*** SimpleApp.scala ***/
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "$YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val sc = new SparkContext("local", "Simple App", "YOUR_SPARK_HOME",
      List("target/scala-2.10/simple-project_2.10-1.0.jar"))
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

This program just counts the number of lines containing ‘a’ and the number containing ‘b’ in the Spark README. Note that you’ll need to replace $YOUR_SPARK_HOME with the location where Spark is installed. Unlike the earlier examples with the Spark shell, which initializes its own SparkContext, we initialize a SparkContext as part of the proogram. We pass the SparkContext constructor four arguments, the type of scheduler we want to use (in this case, a local scheduler), a name for the application, the directory where Spark is installed, and a name for the jar file containing the application’s code. The final two arguments are needed in a distributed setting, where Spark is running across several nodes, so we include them for completeness. Spark will automatically ship the jar files you list to slave nodes.

This file depends on the Spark API, so we’ll also include an sbt configuration file, simple.sbt which explains that Spark is a dependency. This file also adds a repository that Spark depends on:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.10.3"

libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.0-incubating"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

If you also wish to read data from Hadoop’s HDFS, you will also need to add a dependency on hadoop-client for your version of HDFS:

libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "<your-hdfs-version>"

Finally, for sbt to work correctly, we’ll need to layout SimpleApp.scala and simple.sbt according to the typical directory structure. Once that is in place, we can create a JAR package containing the application’s code, then use sbt/sbt run to execute our program.

$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

$ sbt/sbt package
$ sbt/sbt run
...
Lines with a: 46, Lines with b: 23


使用Java编写的单独程序

现在我们使用Java API来编写一个独立的程序. 使用Maven一步一步搞定. 如果你使用了其他的构建系统, 请参考开发指南中 Spark 集合JAR的部分.

下面来写一个非常简单的Spark程序, SimpleApp.java:

/*** SimpleApp.java ***/
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "$YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    JavaSparkContext sc = new JavaSparkContext("local", "Simple App",
      "$YOUR_SPARK_HOME", new String[]{"target/simple-project-1.0.jar"});
    JavaRDD<String> logData = sc.textFile(logFile).cache();

    long numAs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("a"); }
    }).count();

    long numBs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("b"); }
    }).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
  }
}

这个程序计算了文件中包含字符 ‘a’的行数和包含字符 ‘b’的函数. 注意这里需要把 $YOUR_SPARK_HOME 替换为Spark的安装路径. 同 Scala 的样例一样, 需要初始化 SparkContext, 这里使用了一个更为友好的 JavaSparkContext 类. 我们可以新建RDDs (表现为 JavaRDD) 并进行转换. 最后,传递函数使用了继承spark.api.java.function.Function的构建类.  Java编程指南 描述了这些细节上的不同.

可以编写Maven的 pom.xml 文件将Spark作依赖. 请注意 Spark artifacts 中标记了 Scala 的版本.

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <repositories>
    <repository>
      <id>Akka repository</id>
      <url>http://repo.akka.io/releases</url>
    </repository>
  </repositories>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>0.9.0-incubating</version>
    </dependency>
  </dependencies>
</project>

如果同时需要从 Hadoop的 HDFS中读取数据, 则需要为所使用版本的 HDFS添加 hadoop-client 依赖:

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>...</version>
</dependency>

其文件结构遵循典型的Maven文件架构:

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

现在,你可以通过Maven来执行这个程序:

$ mvn package
$ mvn exec:java -Dexec.mainClass="SimpleApp"
...
Lines with a: 46, Lines with b: 23

使用Python编写的单独程序

Now we will show how to write a standalone application using the Python API (PySpark).

As an example, we’ll create a simple Spark application, SimpleApp.py:

"""SimpleApp.py"""
from pyspark import SparkContext

logFile = "$YOUR_SPARK_HOME/README.md"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()

print "Lines with a: %i, lines with b: %i" % (numAs, numBs)

This program just counts the number of lines containing ‘a’ and the number containing ‘b’ in a text file. Note that you’ll need to replace $YOUR_SPARK_HOME with the location where Spark is installed. As with the Scala and Java examples, we use a SparkContext to create RDDs. We can pass Python functions to Spark, which are automatically serialized along with any variables that they reference. For applications that use custom classes or third-party libraries, we can add those code dependencies to SparkContext to ensure that they will be available on remote machines; this is described in more detail in the Python programming guideSimpleApp is simple enough that we do not need to specify any code dependencies.

We can run this application using the bin/pyspark script:

$ cd $SPARK_HOME
$ ./bin/pyspark SimpleApp.py
...
Lines with a: 46, Lines with b: 23

在集群上运行

这有一些额外的注意事项,当程序运行在一个 SparkYARN, 或者 Mesos 集群上.

加入依赖包

当你的程序依赖于很多其他库的时候, 你需要确保在 slave nodes中也引入了这些包. 一个流行的方法是建立一个集合所有代码和依赖库的jar包. sbt 和Maven 都有提供集合功能的插件. 当构建集合jars时, 列出Spark作为一个已提供的依赖; 在slaves中存在就不需要捆绑了. 一旦你有了一个集合包, 将他加入SparkContext.你也可以用SparkContext的addJar方法一步步添加你的依赖包.

对于Python, 可以使用SparkContext的pyFiles参数 或 addPyFile方法添加 .py.zip or .egg 等文件到分布式系统.

配置选项

Spark 包含若干可以改变程序行为的配置选项( configuration options )。这需要构建一个 SparkConf 对象到SparkContext 构造器. 例如, 在Java 和Scala 环境下, 你可以这样写:

import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf()
             .setMaster("local")
             .setAppName("My application")
             .set("spark.executor.memory", "1g")
val sc = new SparkContext(conf)

使用Python可以这样写:

from pyspark import SparkConf, SparkContext
conf = SparkConf()
conf.setMaster("local")
conf.setAppName("My application")
conf.set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)

 在Hadoop Filesystems中存取数据

这个例子是存取一个本地文件. 如果要在一个分布式系统(例如HDFS)中读取数据,需要在你的构建文件中包含 Hadoop版本信息. Spark默认基于HDFS 1.0.4构建.


分享到:0
关注微信,跟着我们扩展技术视野。每天推送IT新技术文章,每周聚焦一门新技术。微信二维码如下:
微信公众账号:尚学堂(微信号:bjsxt-java)
北京总部地址:北京市海淀区西三旗桥东建材城西路85号神州科技园B座三层尚学堂 咨询电话:400-009-1906 010-56233821
Copyright 2007-2015 北京尚学堂科技有限公司 京ICP备13018289号-1 京公网安备11010802015183