Spark简介

Apache Spark是一个开源集群运算框架,最初是由加州大学柏克莱分校AMPLab所开发。相对于HadoopMapReduce会在运行完工作后将中介数据存放到磁盘中,Spark使用了存储器内运算技术,能在数据尚未写入硬盘时即在存储器内分析运算。Spark在存储器内运行程序的运算速度能做到比Hadoop MapReduce的运算速度快上100倍,即便是运行程序于硬盘时,Spark也能快上10倍速度。Spark允许用户将数据加载至集群存储器,并多次对其进行查询,非常适合用于机器学习算法。

使用Spark需要搭配集群管理员和分布式存储系统。Spark支持独立模式(本地Spark集群)、Hadoop YARNApache Mesos的集群管理。在分布式存储方面,Spark可以和HDFSCassandraOpenStack SwiftAmazon S3等接口搭载。 Spark也支持伪分布式(pseudo-distributed)本地模式,不过通常只用于开发或测试时以本机文件系统取代分布式存储系统。在这样的情况下,Spark仅在一台机器上使用每个CPU核心运行程序。

项目构成要素

Spark项目包含下列几项:

  1. Spark核心和弹性分布式数据集(RDDs) Spark核心是整个项目的基础,提供了分布式任务调度,调度和基本的I/O功能。而其基础的程序抽象则称为弹性分布式数据集(RDDs),是一个可以并行操作、有容错机制的数据集合。 RDDs可以透过引用外部存储系统的数据集创建(例如:共享文件系统、HDFS、HBase或其他 Hadoop 数据格式的数据源)。或者是透过在现有RDDs的转换而创建(比如:map、filter、reduce、join等等)。 RDD抽象化是经由一个以Scala, Java, Python的语言集成API所呈现,简化了编程复杂性,应用程序操纵RDDs的方法类似于操纵本地端的数据集合。
  2. Spark SQL Spark SQL在Spark核心上带出一种名为SchemaRDD的数据抽象化概念,提供结构化和半结构化数据相关的支持。Spark SQL提供了领域特定语言,可使用Scala、Java或Python来操纵SchemaRDDs。它还支持使用使用命令行界面和ODBC/JDBC服务器操作SQL语言。在Spark 1.3版本,SchemaRDD被重命名为DataFrame。
  3. Spark Streaming Spark Streaming充分利用Spark核心的快速调度能力来运行流分析。它截取小批量的数据并对之运行RDD转换。这种设计使流分析可在同一个引擎内使用同一组为批量分析编写而撰写的应用程序代码。
  4. MLlib MLlib是Spark上分布式机器学习框架。Spark分布式存储器式的架构比Hadoop磁盘式的Apache Mahout快上10倍,扩展性甚至比Vowpal Wabbit要好。MLlib可使用许多常见的机器学习和统计算法,简化大规模机器学习时间。
  5. GraphX GraphX是Spark上的分布式图形处理框架。它提供了一组API,可用于表达图表计算并可以模拟Pregel抽象化。GraphX还对这种抽象化提供了优化运行。 GraphX最初为加州大学柏克莱分校AMPLab和Databricks的研究项目,后来捐赠给Spark项目。

实验1. 用Spark计算PI的值

Spark附带了几个示例程序。 Scala,Java,Python和R示例位于examples/src/main目录中。 要运行其中一个Java或Scala示例程序,请在顶级Spark目录中使用bin/run-example params。下面介绍使用Spark计算PI的值。

进入Spark根目录,执行以下命令:

1
2
# cd /opt/spark/dist
# ./bin/run-example SparkPi 10

image-20230401171636876迭代10次以后得到PI的近似值

1
Pi is roughly 3.1393791393791393

Spark同时提供了Python版本的示例程序,通过以下命令执行PI计算:

1
# ./bin/spark-submit /opt/spark/dist/examples/src/main/python/pi.py

近似结果为

1
Pi is roughly 3.140280

image-20230401171651175

实验2. 在spark-shell中执行词频统计

2.1 编辑输入文件

本次实验统计某个文件中单词出现的次数,首先编辑输入文件。实训环境中已经提供了输入文件,路径为/root/input.txt,读者可根据需要自行编辑。文件的内容为:

hello world hello spark hello hadoop

实验2. 在spark-shell中执行词频统计

2.2 启动spark-shell

执行以下命令启动spark-shell

1
2
3
# cd /opt/spark/dist
# ./bin/spark-shell
scala>

image-20230401171705228

实验2. 在spark-shell中执行词频统计

2.4 词频统计

完整代码如下:

1
2
3
scala> val textFile = sc.textFile("file:///root/input.txt")
scala> val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
scala> wordCount.collect()

上面只给出了代码,省略了执行过程中返回的结果信息,因为返回信息很多。下面简单解释一下上面的语句。

textFile包含了多行文本内容,textFile.flatMap(line => line.split(” “))会遍历textFile中的每行文本内容,当遍历到其中一行文本内容时,会把文本内容赋值给变量line,并执行Lamda表达式line => line.split(” “)。line => line.split(” “)是一个Lamda表达式,左边表示输入参数,右边表示函数里面执行的处理逻辑,这里执行line.split(” “),也就是针对line中的一行文本内容,采用空格作为分隔符进行单词切分,从一行文本切分得到很多个单词构成的单词集合。这样,对于textFile中的每行文本,都会使用Lamda表达式得到一个单词集合,最终,多行文本,就得到多个单词集合。textFile.flatMap()操作就把这多个单词集合“拍扁”得到一个大的单词集合。

然后,针对这个大的单词集合,执行map()操作,也就是map(word => (word, 1)),这个map操作会遍历这个集合中的每个单词,当遍历到其中一个单词时,就把当前这个单词赋值给变量word,并执行Lamda表达式word => (word, 1),这个Lamda表达式的含义是,word作为函数的输入参数,然后,执行函数处理逻辑,这里会执行(word, 1),也就是针对输入的word,构建得到一个tuple,形式为(word,1),key是word,value是1(表示该单词出现1次)。

程序执行到这里,已经得到一个RDD,这个RDD的每个元素是(key,value)形式的tuple。最后,针对这个RDD,执行reduceByKey((a, b) => a + b)操作,这个操作会把所有RDD元素按照key进行分组,然后使用给定的函数(这里就是Lamda表达式:(a, b) => a + b),对具有相同的key的多个value进行reduce操作,返回reduce后的(key,value),比如(“hello”,1)和(“hello”,1),具有相同的key,进行reduce以后就得到(“hello”,2),这样就计算得到了这个单词的词频。

image-20230401171727279