单机和集群环境下的FP-Growth算法java实现(关联规则挖掘)
2017-12-15 17:00
302 查看
目录(?)[+]
FP-Growth简要描述
FP-Growth单机java实现源码
FP-Growth在spark集群上java实现源码
运行结果
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* FP-tree算法:用于挖掘出事务数据库中的频繁项集,该方法是对APriori算法的改进
*
* @author ShiMin
* @date 2015/10/17
*/
public class FPTree
{
private int minSupport;//最小支持度
public FPTree(int support)
{
this.minSupport = support;
}
/**
* 加载事务数据库
* @param file 文件路径名 文件中每行item由空格分隔
*/
public List<List<String>> loadTransaction(String file)
{
List<List<String>> transactions = new ArrayList<List<String>>();
try
{
BufferedReader br = new BufferedReader(new FileReader(new File(file)));
String line = ”“;
while((line = br.readLine()) != null)
{
transactions.add(Arrays.asList(line.split(” ”)));
}
} catch (Exception e)
{
e.printStackTrace();
}
return transactions;
}
public void FPGrowth(List<List<String>> transactions, List<String> postPattern)
{
//构建头项表
List<TNode> headerTable = buildHeaderTable(transactions);
//构建FP树
TNode tree = bulidFPTree(headerTable, transactions);
//当树为空时退出
if (tree.getChildren()== null || tree.getChildren().size() == 0)
{
return;
}
//输出频繁项集
if(postPattern!=null)
{
for (TNode head : headerTable)
{
System.out.print(head.getCount() + ” ” + head.getItemName());
for (String item : postPattern)
{
System.out.print(” ” + item);
}
System.out.println();
}
}
//遍历每一个头项表节点
for(TNode head : headerTable)
{
List<String> newPostPattern = new LinkedList<String>();
newPostPattern.add(head.getItemName());//添加本次模式基
//加上将前面累积的前缀模式基
if (postPattern != null)
{
newPostPattern.addAll(postPattern);
}
//定义新的事务数据库
List<List<String>> newTransaction = new LinkedList<List<String>>();
TNode nextnode = head.getNext();
//去除名称为head.getItemName()的模式基,构造新的事务数据库
while(nextnode != null)
{
int count = nextnode.getCount();
List<String> parentNodes = new ArrayList<String>();//nextnode节点的所有祖先节点
TNode parent = nextnode.getParent();
while(parent.getItemName() != null)
{
parentNodes.add(parent.getItemName());
parent = parent.getParent();
}
//向事务数据库中重复添加count次parentNodes
while((count–) > 0)
{
newTransaction.add(parentNodes);//添加模式基的前缀 ,因此最终的频繁项为: parentNodes -> newPostPattern
}
//下一个同名节点
nextnode = nextnode.getNext();
}
//每个头项表节点重复上述所有操作,递归
FPGrowth(newTransaction, newPostPattern);
}
}
/**
* 构建头项表,按递减排好序
* @return
*/
public List<TNode> buildHeaderTable(List<List<String>> transactions)
{
List<TNode> list = new ArrayList<TNode>();
Map<String,TNode> nodesmap = new HashMap<String,TNode>();
//为每一个item构建一个节点
for(List<String> lines : transactions)
{
for(int i = 0; i < lines.size(); ++i)
{
String itemName = lines.get(i);
if(!nodesmap.keySet().contains(itemName)) //为item构建节点
{
nodesmap.put(itemName, new TNode(itemName));
}
else //若已经构建过该节点,出现次数加1
{
nodesmap.get(itemName).increaseCount(1);
}
}
}
//筛选满足最小支持度的item节点
for(TNode item : nodesmap.values())
{
if(item.getCount() >= minSupport)
{
list.add(item);
&nbs
23ff8
p; }
}
//按count值从高到低排序
Collections.sort(list);
return list;
}
/**
* 构建FR-tree
* @param headertable 头项表
* @return
*/
public TNode bulidFPTree(List<TNode> headertable, List<List<String>> transactions)
{
TNode rootNode = new TNode();
for(List<String> items : transactions)
{
LinkedList<String> itemsDesc = sortItemsByDesc(items, headertable);
//寻找添加itemsDesc为子树的父节点
TNode subtreeRoot = rootNode;
if(subtreeRoot.getChildren().size() != 0)
{
TNode tempNode = subtreeRoot.findChildren(itemsDesc.peek());
while(!itemsDesc.isEmpty() && tempNode != null)
{
tempNode.increaseCount(1);
subtreeRoot = tempNode;
itemsDesc.poll();
tempNode = subtreeRoot.findChildren(itemsDesc.peek());
}
}
//将itemsDesc中剩余的节点加入作为subtreeRoot的子树
addSubTree(headertable, subtreeRoot, itemsDesc);
}
return rootNode;
}
/**
* @param headertable 头项表
* @param subtreeRoot 子树父节点
* @param itemsDesc 被添加的子树
*/
public void addSubTree(List<TNode> headertable, TNode subtreeRoot, LinkedList<String> itemsDesc)
{
if(itemsDesc.size() > 0)
{
TNode thisnode = new TNode(itemsDesc.pop());//构建新节点
subtreeRoot.getChildren().add(thisnode);
thisnode.setParent(subtreeRoot);
//将thisnode加入头项表对应节点链表的末尾
for(TNode node : headertable)
{
if(node.getItemName().equals(thisnode.getItemName()))
{
TNode lastNode = node;
while(lastNode.getNext() != null)
{
lastNode = lastNode.getNext();
}
lastNode.setNext(thisnode);
}
}
subtreeRoot = thisnode;//更新父节点为当前节点
//递归添加剩余的items
addSubTree(headertable, subtreeRoot, itemsDesc);
}
}
//将items按count从高到低排序
public LinkedList<String> sortItemsByDesc(List<String> items, List<TNode> headertable)
{
LinkedList<String> itemsDesc = new LinkedList<String>();
for(TNode node : headertable)
{
if(items.contains(node.getItemName()))
{
itemsDesc.add(node.getItemName());
}
}
return itemsDesc;
}
public static void main(String[] args)
{
FPTree fptree = new FPTree(4);
List<List<String>> transactions = fptree.loadTransaction(”C:\\Users\\shimin\\Desktop\\新建文件夹\\wordcounts.txt”);
fptree.FPGrowth(transactions, null);
}
/**
* fp-tree节点的数据结构(一个item表示一个节点)
* @author shimin
*
*/
public class TNode implements Comparable<TNode>
{
private String itemName; //项目名
private int count; //事务数据库中出现次数
private TNode parent; //父节点
private List<TNode> children; //子节点
private TNode next;//下一个同名节点
public TNode()
{
this.children = new ArrayList<TNode>();
}
public TNode(String name)
{
this.itemName = name;
this.count = 1;
this.children = new ArrayList<TNode>();
}
public TNode findChildren(String childName)
{
for(TNode node : this.getChildren())
{
if(node.getItemName().equals(childName))
{
return node;
}
}
return null;
}
public TNode getNext()
{
return next;
}
public TNode getParent()
{
return parent;
}
public void setNext(TNode next)
{
this.next = next;
}
public void increaseCount(int num)
{
count += num;
}
public int getCount()
{
return count;
}
public String getItemName()
{
return itemName;
}
public List<TNode> getChildren()
{
return children;
}
public void setParent(TNode parent)
{
this.parent = parent;
}
@Override
public int compareTo(TNode o)
{
return o.getCount() - this.getCount();
}
}
}</span>
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
/**
* @author ShiMin
* @date 2015/10/19
* @description FPGrowth algorithm runs on spark in java.
*/
public class FPTree
{
public static int SUPPORT_DEGREE = 4;//the support of FPGrowth algorithm
public static String SEPARATOR = “ ”;//line separator
public static void main(String[] args)
{
Logger.getLogger(”org.apache.spark”).setLevel(Level.OFF);
args = new String[]{“hdfs://master:9000/data/input/wordcounts.txt”, “hdfs://master:9000/data/output”};
if(args.length != 2)
{
System.err.println(”USage:<Datapath> <Output>”);
System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName(“frequent parttern growth”).setMaster(“local[4]”);
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
//load the transactions data.
JavaRDD<String> lines = ctx.textFile(args[0], 1)
//remove the ID of transaction.
.map(new Function<String, String>()
{
private static final long serialVersionUID = -692074104680411557L;
public String call(String arg0) throws Exception
{
return arg0.substring(arg0.indexOf(“ ”) + 1).trim();
}
});
JavaPairRDD<String, Integer> transactions = constructTransactions(lines);
//run FPGrowth algorithm
FPGrowth(transactions, null, ctx);
//close sparkContext
ctx.close();
}
public static JavaPairRDD<String, Integer> constructTransactions(JavaRDD<String> lines)
{
JavaPairRDD<String, Integer> transactions = lines
//convert lines to <key,value>(or <line,1>) pairs.
.mapToPair(new PairFunction<String, String, Integer>()
{
private static final long serialVersionUID = 5915574436834522347L;
public Tuple2<String, Integer> call(String arg0) throws Exception
{
return new Tuple2<String, Integer>(arg0, 1);
}
})
//combine the same translations.
.reduceByKey(new Function2<Integer, Integer, Integer>()
{
private static final long serialVersionUID = -8075378913994858824L;
public Integer call(Integer arg0, Integer arg1) throws Exception
{
return arg0 + arg1;
}
});
return transactions;
}
/**
* @param transactions
* @param postPattern
*/
public static void FPGrowth(JavaPairRDD<String, Integer> transactions, final List<String> postPattern, JavaSparkContext ctx)
{
//construct headTable
JavaRDD<TNode> headTable = bulidHeadTable(transactions);
List<TNode> headlist = headTable.collect();
//construct FPTree
TNode tree = bulidFPTree(headlist, transactions);
//when the FPTree is empty, then exit the excursion
if(tree.getChildren() == null || tree.getChildren().size() == 0)
{
return;
}
//output the frequent itemSet
if(postPattern != null)
{
for(TNode head : headlist)
{
System.out.print(head.getCount() + ” ” + head.getItemName());
for(String item : postPattern)
{
System.out.print(” ” + item);
}
System.out.println();
}
// headTable.foreach(new VoidFunction<TNode>()
// {
// public void call(TNode head) throws Exception
// {
// System.out.println(head.getCount() + ” ” + head.getItemName());
// for(String item : postPattern)
// {
// System.out.println(“ ” + item);
// }
// }
// });
}
//traverse each item of headTable
for(TNode head : headlist)
{
List<String> newPostPattern = new ArrayList<String>();
newPostPattern.add(head.getItemName());
if(postPattern != null)
{
newPostPattern.addAll(postPattern);
}
//create new transactions
List<String> newTransactionsList = new ArrayList<String>();
TNode nextNode = head.getNext();
while(nextNode != null)
{
int count = head.getCount();
TNode parent = nextNode.getParent();
String tlines = ”“;
while(parent.getItemName() != null)
{
tlines += parent.getItemName() + ” ”;
parent = parent.getParent();
}
while((count–) > 0 && !tlines.equals(“”))
{
newTransactionsList.add(tlines);
}
nextNode = nextNode.getNext();
}
JavaPairRDD<String, Integer> newTransactions = constructTransactions(ctx.parallelize(newTransactionsList));
FPGrowth(newTransactions, newPostPattern, ctx);
}
}
/**
* construct FPTree
* @return the root of FPTree
*/
public static TNode bulidFPTree(List<TNode> headTable, JavaPairRDD<String, Integer> transactions)
{
//create the root node of FPTree
final TNode rootNode = new TNode();
final List<TNode> headItems = headTable;
//convert to transactions which ordered by count DESC and items satisfy the minimum support_degree
JavaPairRDD<LinkedList<String>, Integer> transactionsDesc = transactions.mapToPair(new PairFunction<Tuple2<String,Integer>, LinkedList<String>, Integer>()
{
private static final long serialVersionUID = 4787405828748201473L;
public Tuple2<LinkedList<String>, Integer> call(Tuple2<String, Integer> t)
throws Exception
{
LinkedList<String> descItems = new LinkedList<String>();
List<String> items = Arrays.asList(t._1.split(SEPARATOR));
for(TNode node : headItems)
{
String headName = node.getItemName();
if(items.contains(headName))
{
descItems.add(headName);
}
}
return new Tuple2<LinkedList<String>, Integer>(descItems, t._2);
}
})
.filter(new Function<Tuple2<LinkedList<String>,Integer>, Boolean>()
{
private static final long serialVersionUID = -8157084572151575538L;
public Boolean call(Tuple2<LinkedList<String>, Integer> v1) throws Exception
{
return v1._1.size() > 0;
}
});
List<Tuple2<LinkedList<String>, Integer>> lines = transactionsDesc.collect();
//add each transaction to FPTree
for(Tuple2<LinkedList<String>, Integer> t : lines)
{
LinkedList<String> itemsDesc = t._1;//items to be added to FPTree
int count = t._2;//how many times itemsDesc to be added to FPTree
//find out the root node which add List<String> as subtree
TNode subtreeRoot = rootNode;
if(subtreeRoot.getChildren().size() != 0)
{
TNode tempNode = subtreeRoot.findChildren(itemsDesc.peek());
while(!itemsDesc.isEmpty() && tempNode != null)
{
tempNode.countIncrement(count);
subtreeRoot = tempNode;
itemsDesc.poll();
tempNode = subtreeRoot.findChildren(itemsDesc.peek());
}
}
//add the left items of List<String> to FPTree
addSubTree(headItems, subtreeRoot, itemsDesc, count);
}
// transactionsDesc.foreach(new VoidFunction<Tuple2<LinkedList<String>,Integer>>()
// {
// private static final long serialVersionUID = 8054620367976985036L;
//
// public void call(Tuple2<LinkedList<String>, Integer> t) throws Exception
// {
// LinkedList<String> itemsDesc = t._1;//items to be added to FPTree
// int count = t._2;//how many times itemsDesc to be added to FPTree
// //find out the root node which add List<String> as subtree
// TNode subtreeRoot = rootNode;
// if(subtreeRoot.getChildren().size() != 0)
// {
// TNode tempNode = subtreeRoot.findChildren(itemsDesc.peek());
// while(!itemsDesc.isEmpty() && tempNode != null)
// {
// tempNode.countIncrement(count * 2);
// subtreeRoot = tempNode;
// itemsDesc.poll();
// tempNode = subtreeRoot.findChildren(itemsDesc.peek());
// }
// }
// //add the left items of List<String> to FPTree
// addSubTree(headItems, subtreeRoot, itemsDesc, count);
// }
// });
return rootNode;
}
/**
*
* @param headTable
* @param subtreeRoot
* @param itemsDesc
* @param count
*/
public static void addSubTree(List<TNode> headItems, TNode subtreeRoot, LinkedList<String> itemsDesc, int count)
{
if(itemsDesc.size() > 0)
{
final TNode thisNode = new TNode(itemsDesc.pop(), count);//construct a new node
subtreeRoot.getChildren().add(thisNode);
thisNode.setParent(subtreeRoot);
//add thisNode to the relevant headTable node list
for(TNode t : headItems)
{
if(t.getItemName().equals(thisNode.getItemName()))
{
TNode lastNode = t;
while(lastNode.getNext() != null)
{
lastNode = lastNode.getNext();
}
lastNode.setNext(thisNode);
}
}
subtreeRoot = thisNode;//update thisNode as the current subtreeRoot
//add the left items in itemsDesc recursively
addSubTree(headItems, subtreeRoot, itemsDesc, count);
}
}
/**
* construct the headTable of the format <count, itemName> descended.
* @param transactions
* @return headTable
*/
public static JavaRDD<TNode> bulidHeadTable(JavaPairRDD<String, Integer> transactions)
{
JavaRDD<TNode> headtable = transactions.flatMapToPair(new PairFlatMapFunction<Tuple2<String,Integer>, String, Integer>()
{
private static final long serialVersionUID = -3654849959547730063L;
public Iterable<Tuple2<String, Integer>> call(Tuple2<String, Integer> arg0)
throws Exception
{
List<Tuple2<String, Integer>> t2list = new ArrayList<Tuple2<String, Integer>>();
String[] items = arg0._1.split(SEPARATOR);
int count = arg0._2;
for(String item : items)
{
t2list.add(new Tuple2<String, Integer>(item, count));
}
return t2list;
}
})
//combine the same items.
.reduceByKey(new Function2<Integer, Integer, Integer>()
{
private static final long serialVersionUID = 629605042999702574L;
public Integer call(Integer arg0, Integer arg1) throws Exception
{
return arg0 + arg1;
}
})
//convert <ietm,integer> to <integr,item> format.
.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>()
{
private static final long serialVersionUID = -7017909569876993192L;
public Tuple2<Integer, String> call(Tuple2<String, Integer> t)
throws Exception
{
return new Tuple2<Integer, String>(t._2, t._1);
}
})
//filter out items which satisfies the minimum support_degree.
.filter(new Function<Tuple2<Integer, String>, Boolean>()
{
private static final long serialVersionUID = -3643383589739281939L;
public Boolean call(Tuple2<Integer, String> v1) throws Exception
{
return v1._1 >= SUPPORT_DEGREE;
}
})
//sort items in descent.
.sortByKey(false)
//convert transactions to TNode.
.map(new Function<Tuple2<Integer,String>, TNode>()
{
private static final long serialVersionUID = 16629827688034851L;
public TNode call(Tuple2<Integer, String> v1) throws Exception
{
return new TNode(v1._2, v1._1);
}
});
return headtable;
}
}
</span>
FP-Growth简要描述
FP-Growth单机java实现源码
FP-Growth在spark集群上java实现源码
运行结果
1 FP-Growth简要描述
和Apriori算法一样,都是用于关联规则挖掘的算法。Apriori算法每生成一次k频繁项集都需要遍历一次事务数据库,当事务数据库很大时会有频繁的I/O操作,因此只适合找出小数据集的频繁项集;而FP-Growth算法整个过程中,只有两次扫描事务数据库,一次发生在数据预处理(包括去掉事务的ID编号、合并相同事务等),另一次发生在构造FP-Tree的头项表,因此该种算法对于大数据集效率也很高。FP-Growth算法的步骤主要有:数据预处理、构造头项表(需要筛选出满足最小支持度的item)、构造频繁树、接下来就是遍历头项表,递归得到所有模式基,所有频繁项集。2 FP-Growth单机java实现源码
[java] view plain copy print?<span style=“font-size:14px;”>package org.min.ml;import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* FP-tree算法:用于挖掘出事务数据库中的频繁项集,该方法是对APriori算法的改进
*
* @author ShiMin
* @date 2015/10/17
*/
public class FPTree
{
private int minSupport;//最小支持度
public FPTree(int support)
{
this.minSupport = support;
}
/**
* 加载事务数据库
* @param file 文件路径名 文件中每行item由空格分隔
*/
public List<List<String>> loadTransaction(String file)
{
List<List<String>> transactions = new ArrayList<List<String>>();
try
{
BufferedReader br = new BufferedReader(new FileReader(new File(file)));
String line = ”“;
while((line = br.readLine()) != null)
{
transactions.add(Arrays.asList(line.split(” ”)));
}
} catch (Exception e)
{
e.printStackTrace();
}
return transactions;
}
public void FPGrowth(List<List<String>> transactions, List<String> postPattern)
{
//构建头项表
List<TNode> headerTable = buildHeaderTable(transactions);
//构建FP树
TNode tree = bulidFPTree(headerTable, transactions);
//当树为空时退出
if (tree.getChildren()== null || tree.getChildren().size() == 0)
{
return;
}
//输出频繁项集
if(postPattern!=null)
{
for (TNode head : headerTable)
{
System.out.print(head.getCount() + ” ” + head.getItemName());
for (String item : postPattern)
{
System.out.print(” ” + item);
}
System.out.println();
}
}
//遍历每一个头项表节点
for(TNode head : headerTable)
{
List<String> newPostPattern = new LinkedList<String>();
newPostPattern.add(head.getItemName());//添加本次模式基
//加上将前面累积的前缀模式基
if (postPattern != null)
{
newPostPattern.addAll(postPattern);
}
//定义新的事务数据库
List<List<String>> newTransaction = new LinkedList<List<String>>();
TNode nextnode = head.getNext();
//去除名称为head.getItemName()的模式基,构造新的事务数据库
while(nextnode != null)
{
int count = nextnode.getCount();
List<String> parentNodes = new ArrayList<String>();//nextnode节点的所有祖先节点
TNode parent = nextnode.getParent();
while(parent.getItemName() != null)
{
parentNodes.add(parent.getItemName());
parent = parent.getParent();
}
//向事务数据库中重复添加count次parentNodes
while((count–) > 0)
{
newTransaction.add(parentNodes);//添加模式基的前缀 ,因此最终的频繁项为: parentNodes -> newPostPattern
}
//下一个同名节点
nextnode = nextnode.getNext();
}
//每个头项表节点重复上述所有操作,递归
FPGrowth(newTransaction, newPostPattern);
}
}
/**
* 构建头项表,按递减排好序
* @return
*/
public List<TNode> buildHeaderTable(List<List<String>> transactions)
{
List<TNode> list = new ArrayList<TNode>();
Map<String,TNode> nodesmap = new HashMap<String,TNode>();
//为每一个item构建一个节点
for(List<String> lines : transactions)
{
for(int i = 0; i < lines.size(); ++i)
{
String itemName = lines.get(i);
if(!nodesmap.keySet().contains(itemName)) //为item构建节点
{
nodesmap.put(itemName, new TNode(itemName));
}
else //若已经构建过该节点,出现次数加1
{
nodesmap.get(itemName).increaseCount(1);
}
}
}
//筛选满足最小支持度的item节点
for(TNode item : nodesmap.values())
{
if(item.getCount() >= minSupport)
{
list.add(item);
&nbs
23ff8
p; }
}
//按count值从高到低排序
Collections.sort(list);
return list;
}
/**
* 构建FR-tree
* @param headertable 头项表
* @return
*/
public TNode bulidFPTree(List<TNode> headertable, List<List<String>> transactions)
{
TNode rootNode = new TNode();
for(List<String> items : transactions)
{
LinkedList<String> itemsDesc = sortItemsByDesc(items, headertable);
//寻找添加itemsDesc为子树的父节点
TNode subtreeRoot = rootNode;
if(subtreeRoot.getChildren().size() != 0)
{
TNode tempNode = subtreeRoot.findChildren(itemsDesc.peek());
while(!itemsDesc.isEmpty() && tempNode != null)
{
tempNode.increaseCount(1);
subtreeRoot = tempNode;
itemsDesc.poll();
tempNode = subtreeRoot.findChildren(itemsDesc.peek());
}
}
//将itemsDesc中剩余的节点加入作为subtreeRoot的子树
addSubTree(headertable, subtreeRoot, itemsDesc);
}
return rootNode;
}
/**
* @param headertable 头项表
* @param subtreeRoot 子树父节点
* @param itemsDesc 被添加的子树
*/
public void addSubTree(List<TNode> headertable, TNode subtreeRoot, LinkedList<String> itemsDesc)
{
if(itemsDesc.size() > 0)
{
TNode thisnode = new TNode(itemsDesc.pop());//构建新节点
subtreeRoot.getChildren().add(thisnode);
thisnode.setParent(subtreeRoot);
//将thisnode加入头项表对应节点链表的末尾
for(TNode node : headertable)
{
if(node.getItemName().equals(thisnode.getItemName()))
{
TNode lastNode = node;
while(lastNode.getNext() != null)
{
lastNode = lastNode.getNext();
}
lastNode.setNext(thisnode);
}
}
subtreeRoot = thisnode;//更新父节点为当前节点
//递归添加剩余的items
addSubTree(headertable, subtreeRoot, itemsDesc);
}
}
//将items按count从高到低排序
public LinkedList<String> sortItemsByDesc(List<String> items, List<TNode> headertable)
{
LinkedList<String> itemsDesc = new LinkedList<String>();
for(TNode node : headertable)
{
if(items.contains(node.getItemName()))
{
itemsDesc.add(node.getItemName());
}
}
return itemsDesc;
}
public static void main(String[] args)
{
FPTree fptree = new FPTree(4);
List<List<String>> transactions = fptree.loadTransaction(”C:\\Users\\shimin\\Desktop\\新建文件夹\\wordcounts.txt”);
fptree.FPGrowth(transactions, null);
}
/**
* fp-tree节点的数据结构(一个item表示一个节点)
* @author shimin
*
*/
public class TNode implements Comparable<TNode>
{
private String itemName; //项目名
private int count; //事务数据库中出现次数
private TNode parent; //父节点
private List<TNode> children; //子节点
private TNode next;//下一个同名节点
public TNode()
{
this.children = new ArrayList<TNode>();
}
public TNode(String name)
{
this.itemName = name;
this.count = 1;
this.children = new ArrayList<TNode>();
}
public TNode findChildren(String childName)
{
for(TNode node : this.getChildren())
{
if(node.getItemName().equals(childName))
{
return node;
}
}
return null;
}
public TNode getNext()
{
return next;
}
public TNode getParent()
{
return parent;
}
public void setNext(TNode next)
{
this.next = next;
}
public void increaseCount(int num)
{
count += num;
}
public int getCount()
{
return count;
}
public String getItemName()
{
return itemName;
}
public List<TNode> getChildren()
{
return children;
}
public void setParent(TNode parent)
{
this.parent = parent;
}
@Override
public int compareTo(TNode o)
{
return o.getCount() - this.getCount();
}
}
}</span>
<span style="font-size:14px;">package org.min.ml; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; /** * FP-tree算法:用于挖掘出事务数据库中的频繁项集,该方法是对APriori算法的改进 * * @author ShiMin * @date 2015/10/17 */ public class FPTree { private int minSupport;//最小支持度 public FPTree(int support) { this.minSupport = support; } /** * 加载事务数据库 * @param file 文件路径名 文件中每行item由空格分隔 */ public List<List<String>> loadTransaction(String file) { List<List<String>> transactions = new ArrayList<List<String>>(); try { BufferedReader br = new BufferedReader(new FileReader(new File(file))); String line = ""; while((line = br.readLine()) != null) { transactions.add(Arrays.asList(line.split(" "))); } } catch (Exception e) { e.printStackTrace(); } return transactions; } public void FPGrowth(List<List<String>> transactions, List<String> postPattern) { //构建头项表 List<TNode> headerTable = buildHeaderTable(transactions); //构建FP树 TNode tree = bulidFPTree(headerTable, transactions); //当树为空时退出 if (tree.getChildren()== null || tree.getChildren().size() == 0) { return; } //输出频繁项集 if(postPattern!=null) { for (TNode head : headerTable) { System.out.print(head.getCount() + " " + head.getItemName()); for (String item : postPattern) { System.out.print(" " + item); } System.out.println(); } } //遍历每一个头项表节点 for(TNode head : headerTable) { List<String> newPostPattern = new LinkedList<String>(); newPostPattern.add(head.getItemName());//添加本次模式基 //加上将前面累积的前缀模式基 if (postPattern != null) { newPostPattern.addAll(postPattern); } //定义新的事务数据库 List<List<String>> newTransaction = new LinkedList<List<String>>(); TNode nextnode = head.getNext(); //去除名称为head.getItemName()的模式基,构造新的事务数据库 while(nextnode != null) { int count = nextnode.getCount(); List<String> parentNodes = new ArrayList<String>();//nextnode节点的所有祖先节点 TNode parent = nextnode.getParent(); while(parent.getItemName() != null) { parentNodes.add(parent.getItemName()); parent = parent.getParent(); } //向事务数据库中重复添加count次parentNodes while((count--) > 0) { newTransaction.add(parentNodes);//添加模式基的前缀 ,因此最终的频繁项为: parentNodes -> newPostPattern } //下一个同名节点 nextnode = nextnode.getNext(); } //每个头项表节点重复上述所有操作,递归 FPGrowth(newTransaction, newPostPattern); } } /** * 构建头项表,按递减排好序 * @return */ public List<TNode> buildHeaderTable(List<List<String>> transactions) { List<TNode> list = new ArrayList<TNode>(); Map<String,TNode> nodesmap = new HashMap<String,TNode>(); //为每一个item构建一个节点 for(List<String> lines : transactions) { for(int i = 0; i < lines.size(); ++i) { String itemName = lines.get(i); if(!nodesmap.keySet().contains(itemName)) //为item构建节点 { nodesmap.put(itemName, new TNode(itemName)); } else //若已经构建过该节点,出现次数加1 { nodesmap.get(itemName).increaseCount(1); } } } //筛选满足最小支持度的item节点 for(TNode item : nodesmap.values()) { if(item.getCount() >= minSupport) { list.add(item); } } //按count值从高到低排序 Collections.sort(list); return list; } /** * 构建FR-tree * @param headertable 头项表 * @return */ public TNode bulidFPTree(List<TNode> headertable, List<List<String>> transactions) { TNode rootNode = new TNode(); for(List<String> items : transactions) { LinkedList<String> itemsDesc = sortItemsByDesc(items, headertable); //寻找添加itemsDesc为子树的父节点 TNode subtreeRoot = rootNode; if(subtreeRoot.getChildren().size() != 0) { TNode tempNode = subtreeRoot.findChildren(itemsDesc.peek()); while(!itemsDesc.isEmpty() && tempNode != null) { tempNode.increaseCount(1); subtreeRoot = tempNode; itemsDesc.poll(); tempNode = subtreeRoot.findChildren(itemsDesc.peek()); } } //将itemsDesc中剩余的节点加入作为subtreeRoot的子树 addSubTree(headertable, subtreeRoot, itemsDesc); } return rootNode; } /** * @param headertable 头项表 * @param subtreeRoot 子树父节点 * @param itemsDesc 被添加的子树 */ public void addSubTree(List<TNode> headertable, TNode subtreeRoot, LinkedList<String> itemsDesc) { if(itemsDesc.size() > 0) { TNode thisnode = new TNode(itemsDesc.pop());//构建新节点 subtreeRoot.getChildren().add(thisnode); thisnode.setParent(subtreeRoot); //将thisnode加入头项表对应节点链表的末尾 for(TNode node : headertable) { if(node.getItemName().equals(thisnode.getItemName())) { TNode lastNode = node; while(lastNode.getNext() != null) { lastNode = lastNode.getNext(); } lastNode.setNext(thisnode); } } subtreeRoot = thisnode;//更新父节点为当前节点 //递归添加剩余的items addSubTree(headertable, subtreeRoot, itemsDesc); } } //将items按count从高到低排序 public LinkedList<String> sortItemsByDesc(List<String> items, List<TNode> headertable) { LinkedList<String> itemsDesc = new LinkedList<String>(); for(TNode node : headertable) { if(items.contains(node.getItemName())) { itemsDesc.add(node.getItemName()); } } return itemsDesc; } public static void main(String[] args) { FPTree fptree = new FPTree(4); List<List<String>> transactions = fptree.loadTransaction("C:\\Users\\shimin\\Desktop\\新建文件夹\\wordcounts.txt"); fptree.FPGrowth(transactions, null); } /** * fp-tree节点的数据结构(一个item表示一个节点) * @author shimin * */ public class TNode implements Comparable<TNode> { private String itemName; //项目名 private int count; //事务数据库中出现次数 private TNode parent; //父节点 private List<TNode> children; //子节点 private TNode next;//下一个同名节点 public TNode() { this.children = new ArrayList<TNode>(); } public TNode(String name) { this.itemName = name; this.count = 1; this.children = new ArrayList<TNode>(); } public TNode findChildren(String childName) { for(TNode node : this.getChildren()) { if(node.getItemName().equals(childName)) { return node; } } return null; } public TNode getNext() { return next; } public TNode getParent() { return parent; } public void setNext(TNode next) { this.next = next; } public void increaseCount(int num) { count += num; } public int getCount() { return count; } public String getItemName() { return itemName; } public List<TNode> getChildren() { return children; } public void setParent(TNode parent) { this.parent = parent; } @Override public int compareTo(TNode o) { return o.getCount() - this.getCount(); } } }</span>
3 FP-Growth在spark集群上java实现源码
[java] view plain copy print?<span style=“font-size:14px;”>package org.min.fpgrowth;import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
/**
* @author ShiMin
* @date 2015/10/19
* @description FPGrowth algorithm runs on spark in java.
*/
public class FPTree
{
public static int SUPPORT_DEGREE = 4;//the support of FPGrowth algorithm
public static String SEPARATOR = “ ”;//line separator
public static void main(String[] args)
{
Logger.getLogger(”org.apache.spark”).setLevel(Level.OFF);
args = new String[]{“hdfs://master:9000/data/input/wordcounts.txt”, “hdfs://master:9000/data/output”};
if(args.length != 2)
{
System.err.println(”USage:<Datapath> <Output>”);
System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName(“frequent parttern growth”).setMaster(“local[4]”);
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
//load the transactions data.
JavaRDD<String> lines = ctx.textFile(args[0], 1)
//remove the ID of transaction.
.map(new Function<String, String>()
{
private static final long serialVersionUID = -692074104680411557L;
public String call(String arg0) throws Exception
{
return arg0.substring(arg0.indexOf(“ ”) + 1).trim();
}
});
JavaPairRDD<String, Integer> transactions = constructTransactions(lines);
//run FPGrowth algorithm
FPGrowth(transactions, null, ctx);
//close sparkContext
ctx.close();
}
public static JavaPairRDD<String, Integer> constructTransactions(JavaRDD<String> lines)
{
JavaPairRDD<String, Integer> transactions = lines
//convert lines to <key,value>(or <line,1>) pairs.
.mapToPair(new PairFunction<String, String, Integer>()
{
private static final long serialVersionUID = 5915574436834522347L;
public Tuple2<String, Integer> call(String arg0) throws Exception
{
return new Tuple2<String, Integer>(arg0, 1);
}
})
//combine the same translations.
.reduceByKey(new Function2<Integer, Integer, Integer>()
{
private static final long serialVersionUID = -8075378913994858824L;
public Integer call(Integer arg0, Integer arg1) throws Exception
{
return arg0 + arg1;
}
});
return transactions;
}
/**
* @param transactions
* @param postPattern
*/
public static void FPGrowth(JavaPairRDD<String, Integer> transactions, final List<String> postPattern, JavaSparkContext ctx)
{
//construct headTable
JavaRDD<TNode> headTable = bulidHeadTable(transactions);
List<TNode> headlist = headTable.collect();
//construct FPTree
TNode tree = bulidFPTree(headlist, transactions);
//when the FPTree is empty, then exit the excursion
if(tree.getChildren() == null || tree.getChildren().size() == 0)
{
return;
}
//output the frequent itemSet
if(postPattern != null)
{
for(TNode head : headlist)
{
System.out.print(head.getCount() + ” ” + head.getItemName());
for(String item : postPattern)
{
System.out.print(” ” + item);
}
System.out.println();
}
// headTable.foreach(new VoidFunction<TNode>()
// {
// public void call(TNode head) throws Exception
// {
// System.out.println(head.getCount() + ” ” + head.getItemName());
// for(String item : postPattern)
// {
// System.out.println(“ ” + item);
// }
// }
// });
}
//traverse each item of headTable
for(TNode head : headlist)
{
List<String> newPostPattern = new ArrayList<String>();
newPostPattern.add(head.getItemName());
if(postPattern != null)
{
newPostPattern.addAll(postPattern);
}
//create new transactions
List<String> newTransactionsList = new ArrayList<String>();
TNode nextNode = head.getNext();
while(nextNode != null)
{
int count = head.getCount();
TNode parent = nextNode.getParent();
String tlines = ”“;
while(parent.getItemName() != null)
{
tlines += parent.getItemName() + ” ”;
parent = parent.getParent();
}
while((count–) > 0 && !tlines.equals(“”))
{
newTransactionsList.add(tlines);
}
nextNode = nextNode.getNext();
}
JavaPairRDD<String, Integer> newTransactions = constructTransactions(ctx.parallelize(newTransactionsList));
FPGrowth(newTransactions, newPostPattern, ctx);
}
}
/**
* construct FPTree
* @return the root of FPTree
*/
public static TNode bulidFPTree(List<TNode> headTable, JavaPairRDD<String, Integer> transactions)
{
//create the root node of FPTree
final TNode rootNode = new TNode();
final List<TNode> headItems = headTable;
//convert to transactions which ordered by count DESC and items satisfy the minimum support_degree
JavaPairRDD<LinkedList<String>, Integer> transactionsDesc = transactions.mapToPair(new PairFunction<Tuple2<String,Integer>, LinkedList<String>, Integer>()
{
private static final long serialVersionUID = 4787405828748201473L;
public Tuple2<LinkedList<String>, Integer> call(Tuple2<String, Integer> t)
throws Exception
{
LinkedList<String> descItems = new LinkedList<String>();
List<String> items = Arrays.asList(t._1.split(SEPARATOR));
for(TNode node : headItems)
{
String headName = node.getItemName();
if(items.contains(headName))
{
descItems.add(headName);
}
}
return new Tuple2<LinkedList<String>, Integer>(descItems, t._2);
}
})
.filter(new Function<Tuple2<LinkedList<String>,Integer>, Boolean>()
{
private static final long serialVersionUID = -8157084572151575538L;
public Boolean call(Tuple2<LinkedList<String>, Integer> v1) throws Exception
{
return v1._1.size() > 0;
}
});
List<Tuple2<LinkedList<String>, Integer>> lines = transactionsDesc.collect();
//add each transaction to FPTree
for(Tuple2<LinkedList<String>, Integer> t : lines)
{
LinkedList<String> itemsDesc = t._1;//items to be added to FPTree
int count = t._2;//how many times itemsDesc to be added to FPTree
//find out the root node which add List<String> as subtree
TNode subtreeRoot = rootNode;
if(subtreeRoot.getChildren().size() != 0)
{
TNode tempNode = subtreeRoot.findChildren(itemsDesc.peek());
while(!itemsDesc.isEmpty() && tempNode != null)
{
tempNode.countIncrement(count);
subtreeRoot = tempNode;
itemsDesc.poll();
tempNode = subtreeRoot.findChildren(itemsDesc.peek());
}
}
//add the left items of List<String> to FPTree
addSubTree(headItems, subtreeRoot, itemsDesc, count);
}
// transactionsDesc.foreach(new VoidFunction<Tuple2<LinkedList<String>,Integer>>()
// {
// private static final long serialVersionUID = 8054620367976985036L;
//
// public void call(Tuple2<LinkedList<String>, Integer> t) throws Exception
// {
// LinkedList<String> itemsDesc = t._1;//items to be added to FPTree
// int count = t._2;//how many times itemsDesc to be added to FPTree
// //find out the root node which add List<String> as subtree
// TNode subtreeRoot = rootNode;
// if(subtreeRoot.getChildren().size() != 0)
// {
// TNode tempNode = subtreeRoot.findChildren(itemsDesc.peek());
// while(!itemsDesc.isEmpty() && tempNode != null)
// {
// tempNode.countIncrement(count * 2);
// subtreeRoot = tempNode;
// itemsDesc.poll();
// tempNode = subtreeRoot.findChildren(itemsDesc.peek());
// }
// }
// //add the left items of List<String> to FPTree
// addSubTree(headItems, subtreeRoot, itemsDesc, count);
// }
// });
return rootNode;
}
/**
*
* @param headTable
* @param subtreeRoot
* @param itemsDesc
* @param count
*/
public static void addSubTree(List<TNode> headItems, TNode subtreeRoot, LinkedList<String> itemsDesc, int count)
{
if(itemsDesc.size() > 0)
{
final TNode thisNode = new TNode(itemsDesc.pop(), count);//construct a new node
subtreeRoot.getChildren().add(thisNode);
thisNode.setParent(subtreeRoot);
//add thisNode to the relevant headTable node list
for(TNode t : headItems)
{
if(t.getItemName().equals(thisNode.getItemName()))
{
TNode lastNode = t;
while(lastNode.getNext() != null)
{
lastNode = lastNode.getNext();
}
lastNode.setNext(thisNode);
}
}
subtreeRoot = thisNode;//update thisNode as the current subtreeRoot
//add the left items in itemsDesc recursively
addSubTree(headItems, subtreeRoot, itemsDesc, count);
}
}
/**
* construct the headTable of the format <count, itemName> descended.
* @param transactions
* @return headTable
*/
public static JavaRDD<TNode> bulidHeadTable(JavaPairRDD<String, Integer> transactions)
{
JavaRDD<TNode> headtable = transactions.flatMapToPair(new PairFlatMapFunction<Tuple2<String,Integer>, String, Integer>()
{
private static final long serialVersionUID = -3654849959547730063L;
public Iterable<Tuple2<String, Integer>> call(Tuple2<String, Integer> arg0)
throws Exception
{
List<Tuple2<String, Integer>> t2list = new ArrayList<Tuple2<String, Integer>>();
String[] items = arg0._1.split(SEPARATOR);
int count = arg0._2;
for(String item : items)
{
t2list.add(new Tuple2<String, Integer>(item, count));
}
return t2list;
}
})
//combine the same items.
.reduceByKey(new Function2<Integer, Integer, Integer>()
{
private static final long serialVersionUID = 629605042999702574L;
public Integer call(Integer arg0, Integer arg1) throws Exception
{
return arg0 + arg1;
}
})
//convert <ietm,integer> to <integr,item> format.
.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>()
{
private static final long serialVersionUID = -7017909569876993192L;
public Tuple2<Integer, String> call(Tuple2<String, Integer> t)
throws Exception
{
return new Tuple2<Integer, String>(t._2, t._1);
}
})
//filter out items which satisfies the minimum support_degree.
.filter(new Function<Tuple2<Integer, String>, Boolean>()
{
private static final long serialVersionUID = -3643383589739281939L;
public Boolean call(Tuple2<Integer, String> v1) throws Exception
{
return v1._1 >= SUPPORT_DEGREE;
}
})
//sort items in descent.
.sortByKey(false)
//convert transactions to TNode.
.map(new Function<Tuple2<Integer,String>, TNode>()
{
private static final long serialVersionUID = 16629827688034851L;
public TNode call(Tuple2<Integer, String> v1) throws Exception
{
return new TNode(v1._2, v1._1);
}
});
return headtable;
}
}
</span>
<span style="font-size:14px;">package org.min.fpgrowth; import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; import java.util.List; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; /** * @author ShiMin * @date 2015/10/19 * @description FPGrowth algorithm runs on spark in java. */ public class FPTree { public static int SUPPORT_DEGREE = 4;//the support of FPGrowth algorithm public static String SEPARATOR = " ";//line separator public static void main(String[] args) { Logger.getLogger("org.apache.spark").setLevel(Level.OFF); args = new String[]{"hdfs://master:9000/data/input/wordcounts.txt", "hdfs://master:9000/data/output"}; if(args.length != 2) { System.err.println("USage:<Datapath> <Output>"); System.exit(1); } SparkConf sparkConf = new SparkConf().setAppName("frequent parttern growth").setMaster("local[4]"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); //load the transactions data. JavaRDD<String> lines = ctx.textFile(args[0], 1) //remove the ID of transaction. .map(new Function<String, String>() { private static final long serialVersionUID = -692074104680411557L; public String call(String arg0) throws Exception { return arg0.substring(arg0.indexOf(" ") + 1).trim(); } }); JavaPairRDD<String, Integer> transactions = constructTransactions(lines); //run FPGrowth algorithm FPGrowth(transactions, null, ctx); //close sparkContext ctx.close(); } public static JavaPairRDD<String, Integer> constructTransactions(JavaRDD<String> lines) { JavaPairRDD<String, Integer> transactions = lines //convert lines to <key,value>(or <line,1>) pairs. .mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 5915574436834522347L; public Tuple2<String, Integer> call(String arg0) throws Exception { return new Tuple2<String, Integer>(arg0, 1); } }) //combine the same translations. .reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = -8075378913994858824L; public Integer call(Integer arg0, Integer arg1) throws Exception { return arg0 + arg1; } }); return transactions; } /** * @param transactions * @param postPattern */ public static void FPGrowth(JavaPairRDD<String, Integer> transactions, final List<String> postPattern, JavaSparkContext ctx) { //construct headTable JavaRDD<TNode> headTable = bulidHeadTable(transactions); List<TNode> headlist = headTable.collect(); //construct FPTree TNode tree = bulidFPTree(headlist, transactions); //when the FPTree is empty, then exit the excursion if(tree.getChildren() == null || tree.getChildren().size() == 0) { return; } //output the frequent itemSet if(postPattern != null) { for(TNode head : headlist) { System.out.print(head.getCount() + " " + head.getItemName()); for(String item : postPattern) { System.out.print(" " + item); } System.out.println(); } // headTable.foreach(new VoidFunction<TNode>() // { // public void call(TNode head) throws Exception // { // System.out.println(head.getCount() + " " + head.getItemName()); // for(String item : postPattern) // { // System.out.println(" " + item); // } // } // }); } //traverse each item of headTable for(TNode head : headlist) { List<String> newPostPattern = new ArrayList<String>(); newPostPattern.add(head.getItemName()); if(postPattern != null) { newPostPattern.addAll(postPattern); } //create new transactions List<String> newTransactionsList = new ArrayList<String>(); TNode nextNode = head.getNext(); while(nextNode != null) { int count = head.getCount(); TNode parent = nextNode.getParent(); String tlines = ""; while(parent.getItemName() != null) { tlines += parent.getItemName() + " "; parent = parent.getParent(); } while((count--) > 0 && !tlines.equals("")) { newTransactionsList.add(tlines); } nextNode = nextNode.getNext(); } JavaPairRDD<String, Integer> newTransactions = constructTransactions(ctx.parallelize(newTransactionsList)); FPGrowth(newTransactions, newPostPattern, ctx); } } /** * construct FPTree * @return the root of FPTree */ public static TNode bulidFPTree(List<TNode> headTable, JavaPairRDD<String, Integer> transactions) { //create the root node of FPTree final TNode rootNode = new TNode(); final List<TNode> headItems = headTable; //convert to transactions which ordered by count DESC and items satisfy the minimum support_degree JavaPairRDD<LinkedList<String>, Integer> transactionsDesc = transactions.mapToPair(new PairFunction<Tuple2<String,Integer>, LinkedList<String>, Integer>() { private static final long serialVersionUID = 4787405828748201473L; public Tuple2<LinkedList<String>, Integer> call(Tuple2<String, Integer> t) throws Exception { LinkedList<String> descItems = new LinkedList<String>(); List<String> items = Arrays.asList(t._1.split(SEPARATOR)); for(TNode node : headItems) { String headName = node.getItemName(); if(items.contains(headName)) { descItems.add(headName); } } return new Tuple2<LinkedList<String>, Integer>(descItems, t._2); } }) .filter(new Function<Tuple2<LinkedList<String>,Integer>, Boolean>() { private static final long serialVersionUID = -8157084572151575538L; public Boolean call(Tuple2<LinkedList<String>, Integer> v1) throws Exception { return v1._1.size() > 0; } }); List<Tuple2<LinkedList<String>, Integer>> lines = transactionsDesc.collect(); //add each transaction to FPTree for(Tuple2<LinkedList<String>, Integer> t : lines) { LinkedList<String> itemsDesc = t._1;//items to be added to FPTree int count = t._2;//how many times itemsDesc to be added to FPTree //find out the root node which add List<String> as subtree TNode subtreeRoot = rootNode; if(subtreeRoot.getChildren().size() != 0) { TNode tempNode = subtreeRoot.findChildren(itemsDesc.peek()); while(!itemsDesc.isEmpty() && tempNode != null) { tempNode.countIncrement(count); subtreeRoot = tempNode; itemsDesc.poll(); tempNode = subtreeRoot.findChildren(itemsDesc.peek()); } } //add the left items of List<String> to FPTree addSubTree(headItems, subtreeRoot, itemsDesc, count); } // transactionsDesc.foreach(new VoidFunction<Tuple2<LinkedList<String>,Integer>>() // { // private static final long serialVersionUID = 8054620367976985036L; // // public void call(Tuple2<LinkedList<String>, Integer> t) throws Exception // { // LinkedList<String> itemsDesc = t._1;//items to be added to FPTree // int count = t._2;//how many times itemsDesc to be added to FPTree // //find out the root node which add List<String> as subtree // TNode subtreeRoot = rootNode; // if(subtreeRoot.getChildren().size() != 0) // { // TNode tempNode = subtreeRoot.findChildren(itemsDesc.peek()); // while(!itemsDesc.isEmpty() && tempNode != null) // { // tempNode.countIncrement(count * 2); // subtreeRoot = tempNode; // itemsDesc.poll(); // tempNode = subtreeRoot.findChildren(itemsDesc.peek()); // } // } // //add the left items of List<String> to FPTree // addSubTree(headItems, subtreeRoot, itemsDesc, count); // } // }); return rootNode; } /** * * @param headTable * @param subtreeRoot * @param itemsDesc * @param count */ public static void addSubTree(List<TNode> headItems, TNode subtreeRoot, LinkedList<String> itemsDesc, int count) { if(itemsDesc.size() > 0) { final TNode thisNode = new TNode(itemsDesc.pop(), count);//construct a new node subtreeRoot.getChildren().add(thisNode); thisNode.setParent(subtreeRoot); //add thisNode to the relevant headTable node list for(TNode t : headItems) { if(t.getItemName().equals(thisNode.getItemName())) { TNode lastNode = t; while(lastNode.getNext() != null) { lastNode = lastNode.getNext(); } lastNode.setNext(thisNode); } } subtreeRoot = thisNode;//update thisNode as the current subtreeRoot //add the left items in itemsDesc recursively addSubTree(headItems, subtreeRoot, itemsDesc, count); } } /** * construct the headTable of the format <count, itemName> descended. * @param transactions * @return headTable */ public static JavaRDD<TNode> bulidHeadTable(JavaPairRDD<String, Integer> transactions) { JavaRDD<TNode> headtable = transactions.flatMapToPair(new PairFlatMapFunction<Tuple2<String,Integer>, String, Integer>() { private static final long serialVersionUID = -3654849959547730063L; public Iterable<Tuple2<String, Integer>> call(Tuple2<String, Integer> arg0) throws Exception { List<Tuple2<String, Integer>> t2list = new ArrayList<Tuple2<String, Integer>>(); String[] items = arg0._1.split(SEPARATOR); int count = arg0._2; for(String item : items) { t2list.add(new Tuple2<String, Integer>(item, count)); } return t2list; } }) //combine the same items. .reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 629605042999702574L; public Integer call(Integer arg0, Integer arg1) throws Exception { return arg0 + arg1; } }) //convert <ietm,integer> to <integr,item> format. .mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() { private static final long serialVersionUID = -7017909569876993192L; public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception { return new Tuple2<Integer, String>(t._2, t._1); } }) //filter out items which satisfies the minimum support_degree. .filter(new Function<Tuple2<Integer, String>, Boolean>() { private static final long serialVersionUID = -3643383589739281939L; public Boolean call(Tuple2<Integer, String> v1) throws Exception { return v1._1 >= SUPPORT_DEGREE; } }) //sort items in descent. .sortByKey(false) //convert transactions to TNode. .map(new Function<Tuple2<Integer,String>, TNode>() { private static final long serialVersionUID = 16629827688034851L; public TNode call(Tuple2<Integer, String> v1) throws Exception { return new TNode(v1._2, v1._1); } }); return headtable; } } </span>
4 运行结果
相关文章推荐
- 单机和集群环境下的FP-Growth算法java实现(关联规则挖掘)
- 详解python实现FP-TREE进行关联规则挖掘(带有FP树显示功能)附源代码下载(4)
- 详解python实现FP-TREE进行关联规则挖掘(带有FP树显示功能)附源代码下载(1)
- java实现fp-growth算法
- 数据挖掘笔记-关联规则-FPGrowth-MapReduce实现
- 详解python实现FP-TREE进行关联规则挖掘(带有FP树显示功能)附源代码下载(5)
- 详解python实现FP-TREE进行关联规则挖掘(带有FP树显示功能)附源代码下载(3)
- 数据挖掘-关联分析频繁模式挖掘Apriori、FP-Growth及Eclat算法的JAVA及C++实现
- redis集群环境搭建以及java中jedis客户端集群代码实现
- 机器学习实战——第十一/十二章:关联规则挖掘Apriori算法和FP-growth算法
- redis集群环境搭建以及java中jedis客户端集群代码实现
- 数据挖掘-关联分析频繁模式挖掘Apriori、FP-Growth及Eclat算法的JAVA及C++实现
- redis集群环境搭建以及java中jedis客户端集群代码实现
- 基于FP-Tree的关联规则FP-Growth推荐算法Java实现
- 数据挖掘笔记-关联规则-FPGrowth-简单实现
- 基于FP-Tree的关联规则FP-Growth推荐算法Java实现
- 详解python实现FP-TREE进行关联规则挖掘(带有FP树显示功能)附源代码下载(3)
- 数据挖掘进阶之关联规则挖掘FP-Growth算法
- 详解python实现FP-TREE进行关联规则挖掘(带有FP树显示功能)附源代码下载(4)
- java实现fp-growth算法