您的位置:首页 > 其它

Spark机器学习1

2015-10-13 14:14 330 查看

1. Spark的环境搭建与运行

Spark的本地模式与集群模式完全兼容,本地编写和测试过的程序仅需增加少许设置便能在集群上运行。

任何Spark程序的编写都是从SparkContext(或用Java编写时的JavaSparkContext)开始的。SparkConf对象包含了Spark集群配置的各种参数,SparkContext的初始化需要一个SparkConf对象。

要想通过Scala来使用Spark shell,只需从Spark的主目录执行
./bin/spark-shell
。想要在Python shell中使用Spark,直接运行
./bin/pyspark
命令即可。

RDD(弹性分布式数据集)

RDD可从现有的集合创建,也可以基于Hadoop的输入源创建,比如本地文件系统、HDFS和Amazon S3。

在Spark编程模式下,所有的操作被分为Transformation和Action两种。

Transformation是对一个数据集里的所有记录执行某种函数,从而使记录发生改变。

Action通常是运行某些计算或聚合操作,并将结果返回运行SparkContext的那个驱动程序。

Spark的另一个核心功能是能创建两种特殊类型的变量:广播变量和累加器。

广播变量为只读变量。Spark下创建广播变量只需在SparkContext上调用一个方法即可:
val broadcastAList = sc.broadcast(List("a", "b", "c", "d", "e"))
。广播变量存储在内存中。

累加器也是一种被广播到工作节点的变量,可以累加,这种累加必须是一种有关联的操作,即它得能保证在全局范围内累加起来的值能被正确地并行计算以及返回驱动程序。全局累加器只允许驱动程序访问。

1.1 Spark Scala编程入门

客户名称商品名商品价格
JohniPhone Cover9.99
JohnHeadphones5.49
JackiPhone Cover9.99
JillSamsung Galaxy Cover8.95
BobiPad Cover5.49
对于Scala程序而言,需要创建两个文件:Scala代码文件以及项目的构建配置文件。项目将使用SBT来构建。

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

/**
* A simple Spark app in Scala
*/
object ScalaApp {

def main(args: Array[String]) {
val sc = new SparkContext("local[2]", "First Spark App")

// we take the raw data in CSV format and convert it into a set of records of the form (user, product, price)
val data = sc.textFile("data/UserPurchaseHistory.csv")
.map(line => line.split(","))
.map(purchaseRecord => (purchaseRecord(0), purchaseRecord(1), purchaseRecord(2)))

// let's count the number of purchases
val numPurchases = data.count()

// let's count how many unique users made purchases
val uniqueUsers = data.map { case (user, product, price) => user }.distinct().count()

// let's sum up our total revenue
val totalRevenue = data.map { case (user, product, price) => price.toDouble }.sum()

// let's find our most popular product
val productsByPopularity = data
.map { case (user, product, price) => (product, 1) }
.reduceByKey(_ + _)
.collect()
.sortBy(-_._2)
val mostPopular = productsByPopularity(0)

// finally, print everything out
println("Total purchases: " + numPurchases)
println("Unique users: " + uniqueUsers)
println("Total revenue: " + totalRevenue)
println("Most popular product: %s with %d purchases".format(mostPopular._1, mostPopular._2))

sc.stop()
}

}


1.2 Spark Java编程入门

Java API与Scala API本质上很相似。Scala代码可以很方便地调用Java代码,但某些Scala代码却无法在Java里调用。

1.8及之前版本的Java并不支持匿名函数,我们经常会创建临时类来传递给Spark操作。这些类会实现操作所需的接口以及call函数。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;

/**
* A simple Spark app in Java
*/
public class JavaApp {

public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
// we take the raw data in CSV format and convert it into a set of records of the form (user, product, price)
JavaRDD<String[]> data = sc.textFile("data/UserPurchaseHistory.csv")
.map(new Function<String, String[]>() {
@Override
public String[] call(String s) throws Exception {
return s.split(",");
}
});

// let's count the number of purchases
long numPurchases = data.count();

// let's count how many unique users made purchases
long uniqueUsers = data.map(new Function<String[], String>() {
@Override
public String call(String[] strings) throws Exception {
return strings[0];
}
}).distinct().count();

// let's sum up our total revenue
double totalRevenue = data.map(new DoubleFunction<String[]>() {
@Override
public Double call(String[] strings) throws Exception {
return Double.parseDouble(strings[2]);
}
}).sum();

// let's find our most popular product
// first we map the data to records of (product, 1) using a PairFunction
// and the Tuple2 class.
// then we call a reduceByKey operation with a Function2, which is essentially the sum function
List<Tuple2<String, Integer>> pairs = data.map(new PairFunction<String[], String, Integer>() {
@Override
public Tuple2<String, Integer> call(String[] strings) throws Exception {
return new Tuple2(strings[1], 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).collect();

// finally we sort the result. Note we need to create a Comparator function,
// that reverses the sort order.
Collections.sort(pairs, new Comparator<Tuple2<String, Integer>>() {
@Override
public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
return -(o1._2() - o2._2());
}
});
String mostPopular = pairs.get(0)._1();
int purchases = pairs.get(0)._2();

// print everything out
System.out.println("Total purchases: " + numPurchases);
System.out.println("Unique users: " + uniqueUsers);
System.out.println("Total revenue: " + totalRevenue);
System.out.println(String.format("Most popular product: %s with %d purchases",
mostPopular, purchases));

sc.stop();

}

}


1.3 Spark Python编程入门

Spark的Python API几乎覆盖了所有Scala API所能提供的功能,但的确有些特性,比如Spark Streaming和个别的API方法,暂不支持。

"""A simple Spark app in Python"""
from pyspark import SparkContext

sc = SparkContext("local[2]", "First Spark App")
# we take the raw data in CSV format and convert it into a set of records of the form (user, product, price)
data = sc.textFile("data/UserPurchaseHistory.csv").map(lambda line: line.split(",")).map(lambda record: (record[0], record[1], record[2]))
# let's count the number of purchases
numPurchases = data.count()
# let's count how many unique users made purchases
uniqueUsers = data.map(lambda record: record[0]).distinct().count()
# let's sum up our total revenue
totalRevenue = data.map(lambda record: float(record[2])).sum()
# let's find our most popular product
products = data.map(lambda record: (record[1], 1.0)).reduceByKey(lambda a, b: a + b).collect()
mostPopular = sorted(products, key=lambda x: x[1], reverse=True)[0]

# Finally, print everything out
print "Total purchases: %d" % numPurchases
print "Unique users: %d" % uniqueUsers
print "Total revenue: %2.2f" % totalRevenue
print "Most popular product: %s with %d purchases" % (mostPopular[0], mostPopular[1])

# stop the SparkContext
sc.stop()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: