Spark机器学习1
2015-10-13 14:14
330 查看
1. Spark的环境搭建与运行
Spark的本地模式与集群模式完全兼容,本地编写和测试过的程序仅需增加少许设置便能在集群上运行。任何Spark程序的编写都是从SparkContext(或用Java编写时的JavaSparkContext)开始的。SparkConf对象包含了Spark集群配置的各种参数,SparkContext的初始化需要一个SparkConf对象。
要想通过Scala来使用Spark shell,只需从Spark的主目录执行
./bin/spark-shell。想要在Python shell中使用Spark,直接运行
./bin/pyspark命令即可。
RDD(弹性分布式数据集)
RDD可从现有的集合创建,也可以基于Hadoop的输入源创建,比如本地文件系统、HDFS和Amazon S3。
在Spark编程模式下,所有的操作被分为Transformation和Action两种。
Transformation是对一个数据集里的所有记录执行某种函数,从而使记录发生改变。
Action通常是运行某些计算或聚合操作,并将结果返回运行SparkContext的那个驱动程序。
Spark的另一个核心功能是能创建两种特殊类型的变量:广播变量和累加器。
广播变量为只读变量。Spark下创建广播变量只需在SparkContext上调用一个方法即可:
val broadcastAList = sc.broadcast(List("a", "b", "c", "d", "e"))。广播变量存储在内存中。
累加器也是一种被广播到工作节点的变量,可以累加,这种累加必须是一种有关联的操作,即它得能保证在全局范围内累加起来的值能被正确地并行计算以及返回驱动程序。全局累加器只允许驱动程序访问。
1.1 Spark Scala编程入门
客户名称 | 商品名 | 商品价格 |
---|---|---|
John | iPhone Cover | 9.99 |
John | Headphones | 5.49 |
Jack | iPhone Cover | 9.99 |
Jill | Samsung Galaxy Cover | 8.95 |
Bob | iPad Cover | 5.49 |
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ /** * A simple Spark app in Scala */ object ScalaApp { def main(args: Array[String]) { val sc = new SparkContext("local[2]", "First Spark App") // we take the raw data in CSV format and convert it into a set of records of the form (user, product, price) val data = sc.textFile("data/UserPurchaseHistory.csv") .map(line => line.split(",")) .map(purchaseRecord => (purchaseRecord(0), purchaseRecord(1), purchaseRecord(2))) // let's count the number of purchases val numPurchases = data.count() // let's count how many unique users made purchases val uniqueUsers = data.map { case (user, product, price) => user }.distinct().count() // let's sum up our total revenue val totalRevenue = data.map { case (user, product, price) => price.toDouble }.sum() // let's find our most popular product val productsByPopularity = data .map { case (user, product, price) => (product, 1) } .reduceByKey(_ + _) .collect() .sortBy(-_._2) val mostPopular = productsByPopularity(0) // finally, print everything out println("Total purchases: " + numPurchases) println("Unique users: " + uniqueUsers) println("Total revenue: " + totalRevenue) println("Most popular product: %s with %d purchases".format(mostPopular._1, mostPopular._2)) sc.stop() } }
1.2 Spark Java编程入门
Java API与Scala API本质上很相似。Scala代码可以很方便地调用Java代码,但某些Scala代码却无法在Java里调用。1.8及之前版本的Java并不支持匿名函数,我们经常会创建临时类来传递给Spark操作。这些类会实现操作所需的接口以及call函数。
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.DoubleFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Collections; import java.util.Comparator; import java.util.List; /** * A simple Spark app in Java */ public class JavaApp { public static void main(String[] args) { JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App"); // we take the raw data in CSV format and convert it into a set of records of the form (user, product, price) JavaRDD<String[]> data = sc.textFile("data/UserPurchaseHistory.csv") .map(new Function<String, String[]>() { @Override public String[] call(String s) throws Exception { return s.split(","); } }); // let's count the number of purchases long numPurchases = data.count(); // let's count how many unique users made purchases long uniqueUsers = data.map(new Function<String[], String>() { @Override public String call(String[] strings) throws Exception { return strings[0]; } }).distinct().count(); // let's sum up our total revenue double totalRevenue = data.map(new DoubleFunction<String[]>() { @Override public Double call(String[] strings) throws Exception { return Double.parseDouble(strings[2]); } }).sum(); // let's find our most popular product // first we map the data to records of (product, 1) using a PairFunction // and the Tuple2 class. // then we call a reduceByKey operation with a Function2, which is essentially the sum function List<Tuple2<String, Integer>> pairs = data.map(new PairFunction<String[], String, Integer>() { @Override public Tuple2<String, Integer> call(String[] strings) throws Exception { return new Tuple2(strings[1], 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }).collect(); // finally we sort the result. Note we need to create a Comparator function, // that reverses the sort order. Collections.sort(pairs, new Comparator<Tuple2<String, Integer>>() { @Override public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) { return -(o1._2() - o2._2()); } }); String mostPopular = pairs.get(0)._1(); int purchases = pairs.get(0)._2(); // print everything out System.out.println("Total purchases: " + numPurchases); System.out.println("Unique users: " + uniqueUsers); System.out.println("Total revenue: " + totalRevenue); System.out.println(String.format("Most popular product: %s with %d purchases", mostPopular, purchases)); sc.stop(); } }
1.3 Spark Python编程入门
Spark的Python API几乎覆盖了所有Scala API所能提供的功能,但的确有些特性,比如Spark Streaming和个别的API方法,暂不支持。"""A simple Spark app in Python""" from pyspark import SparkContext sc = SparkContext("local[2]", "First Spark App") # we take the raw data in CSV format and convert it into a set of records of the form (user, product, price) data = sc.textFile("data/UserPurchaseHistory.csv").map(lambda line: line.split(",")).map(lambda record: (record[0], record[1], record[2])) # let's count the number of purchases numPurchases = data.count() # let's count how many unique users made purchases uniqueUsers = data.map(lambda record: record[0]).distinct().count() # let's sum up our total revenue totalRevenue = data.map(lambda record: float(record[2])).sum() # let's find our most popular product products = data.map(lambda record: (record[1], 1.0)).reduceByKey(lambda a, b: a + b).collect() mostPopular = sorted(products, key=lambda x: x[1], reverse=True)[0] # Finally, print everything out print "Total purchases: %d" % numPurchases print "Unique users: %d" % uniqueUsers print "Total revenue: %2.2f" % totalRevenue print "Most popular product: %s with %d purchases" % (mostPopular[0], mostPopular[1]) # stop the SparkContext sc.stop()
相关文章推荐
- struct和typedef struct的用法
- tableView footerView背景颜色
- LR杂记 - Linux的系统监控工具vmstat详细说明
- SharePoint Server 2013安装与配置
- SpringMVC整合WebSocket
- 欢迎使用CSDN-markdown编辑器
- 数据绑定-1
- C#接口实现方法实例分析
- 位运算学习
- 停车场
- django构建blog--建立数据库部分+admin部分(eclipse+pydev)
- asp.net5中用户认证与授权(2)
- 清理DB2的归档日志
- NO.158 集成禅道和svn
- QAxWidget
- SQL中INNER、LEFT、RIGHT JOIN的区别和用法详解
- 冒泡排序
- MySQL 5.6 & 5.7最优配置模板
- Zabbix 集成 OneAlert 实现全方位告警
- JAVA从入门到精通读书笔记 第二章