您的位置:首页 > 大数据 > 人工智能

【Data Algorithms_Recipes for Scaling up with Hadoop and Spark】Chapter 11 Smarter Email Marketing wit

2016-03-30 20:10 731 查看
:scala版算法实现

package com.bbw5.dataalgorithms.spark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.Partitioner
import org.apache.spark.HashPartitioner
import java.text.SimpleDateFormat
/**
* Input record format:
*   <customerID><,><transactionD><,><purchaseDate><,><amount>
*
* STEP-1: handle input parameters
*
* STEP-2: convert input into RDD<String> where each element is an input record
*
* STEP-3: convert RDD<String> into JavaPairRDD<K,V>, where
*         K: customerID
*         V: Tuple2<purchaseDate, Amount>
*
* STEP-4: Group transactions by customerID: apply groupByKey()
*         to the output of STEP-2, result will be:
*         JavaPairRDD<K2,V2>, where
*         K2: customerID
*         V2: Iterable<Tuple2<purchaseDate, Amount>>
*
* STEP-5: Create Markov "state sequence": State1, State2, ..., StateN
*         mapValues() of JavaPairRDD<K2,V2> and generate JavaPairRDD<K4, V4>
*         First convert (K2, V2) into (K3, V3) pairs [K2 = K3 = K4]
*         V3: sorted(V2) (order is based on purchaseDate)
*         V3: is a sorted "transaction sequence"
*         Then use V3 to create Markov "state sequence" (as V4)
*
*
* STEP-6: Generate Markov State Transition
*         Input is JavaPairRDD<K4, V4> pairs
*         Output is a matrix of states {S1, S2, S3, ...}
*
*            | S1   S2   S3   ...
*         ---+-----------------------
*         S1 |    <probability-value>
*            |
*         S2 |
*            |
*         S3 |
*            |
*         ...|
*
*         which defines the probability of going from one state to
*         another state.  After this matrix is built, we can use new
*         data to predict the next marketing date.
*
* STEP-7: emit final output
*
* @author baibaiw5
*
*/
object SparkMarkovModel {

class CustomPartition(numParts: Int) extends Partitioner {
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int = {
key match {
case (a: String, _: Long) => nonNegativeMod(a.hashCode(), numParts)
case _                    => nonNegativeMod(key.hashCode(), numParts)
}
}

def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}

override def equals(other: Any): Boolean = other match {
case cp: CustomPartition => cp.numPartitions == numPartitions
case _                   => false
}
override def hashCode: Int = numPartitions
}

object DateUtil extends Serializable {
val DATE_FORMAT = "yyyy-MM-dd"
val SIMPLE_DATE_FORMAT = new SimpleDateFormat(DATE_FORMAT)

def getDateAsMilliSeconds(date: String) = {
SIMPLE_DATE_FORMAT.parse(date).getTime
}
}

def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("SparkMarkovModel")
val sc = new SparkContext(sparkConf)

val numPartition = 3
val texts = sc.textFile("G:/temp/data/markovmodel.txt", numPartition)

val data = texts.map { _.split(",") }.filter { _.length == 4 }.map {
array => ((array(0) -> DateUtil.getDateAsMilliSeconds(array(2))), array(3).toInt)
}.repartitionAndSortWithinPartitions(new CustomPartition(3))

data.collect().foreach(println)

val data2 = data.map(a => (a._1._1, (a._1._2, a._2))).groupByKey()
data2.collect().foreach(println)

val data3 = data2.flatMap {
case (_, items) =>
val pre = items.slice(0, items.size - 1)
val next = items.slice(1, items.size)
val stats=pre.zip(next).map {
case (p, n) =>
// one day = 24*60*60*1000 = 86400000 milliseconds
val daysDiff = (n._1 - p._1) / 86400000
val dd = if (daysDiff < 30) "S" else if (daysDiff < 60) "M" else "L"
val ad = if (p._2 < 0.9 * n._2) "L" else if (p._2 < 1.1 * n._2) "E" else "G"
dd + ad
}
stats.slice(0, stats.size - 1).zip(stats.slice(1, stats.size)).map(_->1)
}.reduceByKey(_ + _)

data3.collect().foreach(println)
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: