【Data Algorithms_Recipes for Scaling up with Hadoop and Spark】Chapter 11 Smarter Email Marketing wit
2016-03-30 20:10
731 查看
:scala版算法实现
package com.bbw5.dataalgorithms.spark import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.Partitioner import org.apache.spark.HashPartitioner import java.text.SimpleDateFormat /** * Input record format: * <customerID><,><transactionD><,><purchaseDate><,><amount> * * STEP-1: handle input parameters * * STEP-2: convert input into RDD<String> where each element is an input record * * STEP-3: convert RDD<String> into JavaPairRDD<K,V>, where * K: customerID * V: Tuple2<purchaseDate, Amount> * * STEP-4: Group transactions by customerID: apply groupByKey() * to the output of STEP-2, result will be: * JavaPairRDD<K2,V2>, where * K2: customerID * V2: Iterable<Tuple2<purchaseDate, Amount>> * * STEP-5: Create Markov "state sequence": State1, State2, ..., StateN * mapValues() of JavaPairRDD<K2,V2> and generate JavaPairRDD<K4, V4> * First convert (K2, V2) into (K3, V3) pairs [K2 = K3 = K4] * V3: sorted(V2) (order is based on purchaseDate) * V3: is a sorted "transaction sequence" * Then use V3 to create Markov "state sequence" (as V4) * * * STEP-6: Generate Markov State Transition * Input is JavaPairRDD<K4, V4> pairs * Output is a matrix of states {S1, S2, S3, ...} * * | S1 S2 S3 ... * ---+----------------------- * S1 | <probability-value> * | * S2 | * | * S3 | * | * ...| * * which defines the probability of going from one state to * another state. After this matrix is built, we can use new * data to predict the next marketing date. * * STEP-7: emit final output * * @author baibaiw5 * */ object SparkMarkovModel { class CustomPartition(numParts: Int) extends Partitioner { override def numPartitions: Int = numParts override def getPartition(key: Any): Int = { key match { case (a: String, _: Long) => nonNegativeMod(a.hashCode(), numParts) case _ => nonNegativeMod(key.hashCode(), numParts) } } def nonNegativeMod(x: Int, mod: Int): Int = { val rawMod = x % mod rawMod + (if (rawMod < 0) mod else 0) } override def equals(other: Any): Boolean = other match { case cp: CustomPartition => cp.numPartitions == numPartitions case _ => false } override def hashCode: Int = numPartitions } object DateUtil extends Serializable { val DATE_FORMAT = "yyyy-MM-dd" val SIMPLE_DATE_FORMAT = new SimpleDateFormat(DATE_FORMAT) def getDateAsMilliSeconds(date: String) = { SIMPLE_DATE_FORMAT.parse(date).getTime } } def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("SparkMarkovModel") val sc = new SparkContext(sparkConf) val numPartition = 3 val texts = sc.textFile("G:/temp/data/markovmodel.txt", numPartition) val data = texts.map { _.split(",") }.filter { _.length == 4 }.map { array => ((array(0) -> DateUtil.getDateAsMilliSeconds(array(2))), array(3).toInt) }.repartitionAndSortWithinPartitions(new CustomPartition(3)) data.collect().foreach(println) val data2 = data.map(a => (a._1._1, (a._1._2, a._2))).groupByKey() data2.collect().foreach(println) val data3 = data2.flatMap { case (_, items) => val pre = items.slice(0, items.size - 1) val next = items.slice(1, items.size) val stats=pre.zip(next).map { case (p, n) => // one day = 24*60*60*1000 = 86400000 milliseconds val daysDiff = (n._1 - p._1) / 86400000 val dd = if (daysDiff < 30) "S" else if (daysDiff < 60) "M" else "L" val ad = if (p._2 < 0.9 * n._2) "L" else if (p._2 < 1.1 * n._2) "E" else "G" dd + ad } stats.slice(0, stats.size - 1).zip(stats.slice(1, stats.size)).map(_->1) }.reduceByKey(_ + _) data3.collect().foreach(println) } }
相关文章推荐
- Waiting For Debugger
- 107页Pacific Trails Resort案例分析
- Thread详解9:用wait/notify实现生产者/消费者模式
- nginx报错accept() failed (24: Too many open files) 分析解决
- runtime运行时 http://blog.csdn.net/ComeOnZhao/article/details/50982808
- weblogic一个domain配置多个端口
- sendemail + cygwin
- LeetCode 217 Contains Duplicate
- lvchange的available參数
- 小戴人工智能PurposeAI-20180201
- 解决Sublime包管理package control 报错 There are no packages available for installation
- 问题人生[20160330] -[NSCFString containsString:]: unrecognized selector sent to instance 0x7f9902724da0
- UvaLive 6667 Longest Chain (分治求三元组LIS&树状数组)
- UVALive 2965-Jurassic Remains (Mitm)中途相遇法+bitmask
- Canvas(画布)、Paint(画笔) 详解
- CodeForces 615B Longtail Hedgehog
- Thread详解8:wait/notify机制
- MySql EXPLAIN Output Format(Mysql执行计划分析参数)
- 两种Domain Entity生成方式
- Hdu 2473(并查集删除操作) Junk-Mail Filter