您的位置:首页 > 其它

spark (二)

2016-06-20 00:00 204 查看
摘要: 下载与入门

说明: 要在本地运行spark,因为spark是用scala语言写的,运行在JVM上面,你要做的只是安装java 6及以上版本就够了。

2.1 下载spark

访问http://spark.apache.org/downloads.html 的apache 官网下载spark

得到我们的预编译版本的 spark-1.2.0-bin-hadoop2.4.tgz

解压完是这样的:



我们看一下spark的目录结构:

README.md 包含用来入门spark的简单的使用说明

bin 包含可以用来和spark进行各种方式的交互的一些列可执行文件,包括稍后会用到的spark shell

core streaming python。。。 包含spark项目主要组件的源代码

examples 包含一些可以查看和运行的spark程序,对学习spark 的 API非常由帮助。

接下来我们先尝试一下用spark shell 运行一些spark自带的实例代码,然后下回在尝试编译并运行我们自己的spark程序。

2.2 spark 的 shell

spark带有交互式的shell,可以作即使数据分析。和其他的系统中的shell类似,但不同的是在其他shell工具中你只能使用单机的硬盘和内存来操作数据。而spark shell 可以用来与分布式存储在许多台机器的内存或者硬盘上的数据进行交互,并且处理过程的分发由spark自动控制完成。

由于spark能够在工作节点上把数据读取到内存中,所以许多分布式计算都可以在几秒钟之内完成,哪怕是那种在十几个节点上处理TB级别的数据的计算。所以那些交互式的即时探索性分析非常适合spark。spark提供Python及Scala的增强版shell,支持与集群的连接。

spark shell的强大之处 就在于使用某个语言的shell作一些简单的数据分析;

打开spark安装目录,进入bin下面 ./spark-shell 注: 这是scala版本的 spark shell



有点帅~

我建议java开发人员也熟悉一下shell 即使是scala版本的,这对学习spark API 很有帮助。

2.2.1 日志控制

如果觉得shell中输出的日志信息过多而使人分心,可以调整日志的级别来控制输出的信息量。你需要在conf目录下创建一个名为log4j.properties的文件来管理日志设置。

Spark开发者们已经在Spark中加入了一个日志设置文件的模板,我们可以在conf目录下找到 log4j.properties.template

将这个模板复制一份,并去掉.template 放到conf下面使其生效。

我们修改其中的这一行 将日志级别改为WARN 这样日志就会大大减少的



我们可以看到再次启动,日志信息相比第一次已经少了很多了



2.2.2 RDD resilient distributed dataset 弹性分布式数据集

在spark中,我们通过对分布式数据集的操作来表达我们的计算意图,这些计算会自动的在集群上并行进行,这样的数据集被称为弹性分布式数据集,简称RDD。

RDD是Spark对分布式数据和计算的基本抽象

2.2.3 Scala 行数统计

在spark-shell中直接可以输入一下命令进行简单的行数统计 如图:



这是一个scala即时统计的例子

解释一下:



这是创建一个命为lines的RDD, 然后lines.count() 统计RDD中的元素个数,lines.first() 输出RDD中的第一个元素,也就是README.md的第一行。

变量lines是一个RDD,是从你的电脑上的一个本地文本文件创建出来的。我们可以在这个RDD上运行各种并行操作,比如统计这个数据集中的元素个数,或者输出第一个元素。

2.3 Spark 核心概念简介

2.3.1 sparkContext

从上层看,每个spark应用都由一个驱动器程序(driver program)来发起集群上的各种并行操作。驱动器程序包含应用的main函数,并且定义了集群上的分布式数据集。在前面的例子里,实际的驱动器程序就是spark shell 本身,你只要输入想要运行的操作就可以了。

驱动器通过一个spark context对象来访问spark。这个对象代表对计算集群的一个连接。shell启动时已经自动创建了一个spark context对象,是一个叫做sc的变量。我们可以通过在shell中输入sc来查看它的类型。



一旦有了sc ,我们就可以用它来创建RDD,在2.2.3 中 我们调用了sc.textFIle() 来创建了一个代表文件中各行文本的RDD,我们可以在这些行上进行各种数据操作,比如count()。

要执行这些操作,驱动器程序一般要管理多个执行器节点(executor),比如我们在集群上运行count()操作,那么不同的节点会统计文件的不同部分的行数,由于我们刚才是在本地模式下运行spark shell,因此所有的工作都在单个节点上运行,但你可以将这个shell 连接到集群上来进行并行的数据分析。

如下图:



2.3.2 传递函数

扩展2.2.3 中的例子,目标是筛选出文件中包含 “Python”这个单词的行



这就筛选出了包含python这个单词的行。

spark最神奇的地方在于像filter这样的基于函数的操作也会在集群上并行执行。也就是说spark会自动的将函数(比如line.contains("Python"))发到各个执行器节点上。这样你就可以在单一的驱动器程序中编程,并且让代码自动运行在多个节点上。

2.4 在独立程序中应用spark

这一部分是如何在程序中运用spark。除了spark shell, spark 也可以在java scala 或者python中被连接使用。这与在shell中使用的主要区别在于你需要自行初始化SparkContext。然后使用的API就一样了。

连接spark的过程在各个语言中不一样,在java和scala中,只需要给你的工程添加个对于spark-core的Maven依赖。

在maven中央仓库添加对spark的依赖。从maven repositoy中搜索 spark 我们暂且添加1.2版本的spark



完成了应用和spark的连接,然后就是导入spark包并且创建SparkContext。首先你可以通过创建一个SparkConf对象来配置你的应用。基于这个SparkConf创建一个SparkContext对象。

2.4.1 在Java 中初始化Spark

代码如下: 说明见注释

[code=language-java]package test;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

public class Test {
public static void main(String[] args) {
// 初始化SparkContext
SparkConf conf = new SparkConf().setMaster("local").setAppName("My App");
// local是集群URL,是告诉spark如何连接到集群上,我们使用local这个特殊的值可以让spark运行在单机单线程上而无需连接到集群。
// My App 指的是应用名。当连接到集群时这个值可以帮助你在集群管理器的用户界面中找到你的应用。
JavaSparkContext sc = new JavaSparkContext(conf);
// 接下来可以用sc创建各种rdd, 进行各种操作此处省略1000行。。。
System.out.println("blabla...");
// 关闭SparkContext
sc.stop();
}
}

我会在接下来的文章中继续详细讲解spark的用法的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: