您的位置:首页 > 编程语言 > Java开发

单机和集群环境下的FP-Growth算法java实现(关联规则挖掘)

2017-12-15 17:00 302 查看
目录(?)[+]
FP-Growth简要描述
FP-Growth单机java实现源码
FP-Growth在spark集群上java实现源码
运行结果

1 FP-Growth简要描述

和Apriori算法一样,都是用于关联规则挖掘的算法。Apriori算法每生成一次k频繁项集都需要遍历一次事务数据库,当事务数据库很大时会有频繁的I/O操作,因此只适合找出小数据集的频繁项集;而FP-Growth算法整个过程中,只有两次扫描事务数据库,一次发生在数据预处理(包括去掉事务的ID编号、合并相同事务等),另一次发生在构造FP-Tree的头项表,因此该种算法对于大数据集效率也很高。FP-Growth算法的步骤主要有:数据预处理、构造头项表(需要筛选出满足最小支持度的item)、构造频繁树、接下来就是遍历头项表,递归得到所有模式基,所有频繁项集。

2 FP-Growth单机java实现源码

[java] view plain copy print?<span style=“font-size:14px;”>package org.min.ml;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
* FP-tree算法:用于挖掘出事务数据库中的频繁项集,该方法是对APriori算法的改进
*
* @author ShiMin
* @date 2015/10/17
*/
public class FPTree
{
private int minSupport;//最小支持度

public FPTree(int support)
{
this.minSupport = support;
}

/**
* 加载事务数据库
* @param file 文件路径名 文件中每行item由空格分隔
*/
public List<List<String>> loadTransaction(String file)
{
List<List<String>> transactions = new ArrayList<List<String>>();
try
{
BufferedReader br = new BufferedReader(new FileReader(new File(file)));
String line = ”“;
while((line = br.readLine()) != null)
{
transactions.add(Arrays.asList(line.split(” ”)));
}
} catch (Exception e)
{
e.printStackTrace();
}
return transactions;
}

public void FPGrowth(List<List<String>> transactions, List<String> postPattern)
{
//构建头项表
List<TNode> headerTable = buildHeaderTable(transactions);
//构建FP树
TNode tree = bulidFPTree(headerTable, transactions);
//当树为空时退出
if (tree.getChildren()== null || tree.getChildren().size() == 0)
{
return;
}
//输出频繁项集
if(postPattern!=null)
{
for (TNode head : headerTable)
{
System.out.print(head.getCount() + ” ” + head.getItemName());
for (String item : postPattern)
{
System.out.print(” ” + item);
}
System.out.println();
}
}
//遍历每一个头项表节点
for(TNode head : headerTable)
{
List<String> newPostPattern = new LinkedList<String>();
newPostPattern.add(head.getItemName());//添加本次模式基
//加上将前面累积的前缀模式基
if (postPattern != null)
{
newPostPattern.addAll(postPattern);
}
//定义新的事务数据库
List<List<String>> newTransaction = new LinkedList<List<String>>();
TNode nextnode = head.getNext();
//去除名称为head.getItemName()的模式基,构造新的事务数据库
while(nextnode != null)
{
int count = nextnode.getCount();
List<String> parentNodes = new ArrayList<String>();//nextnode节点的所有祖先节点
TNode parent = nextnode.getParent();
while(parent.getItemName() != null)
{
parentNodes.add(parent.getItemName());
parent = parent.getParent();
}
//向事务数据库中重复添加count次parentNodes
while((count–) > 0)
{
newTransaction.add(parentNodes);//添加模式基的前缀 ,因此最终的频繁项为: parentNodes -> newPostPattern
}
//下一个同名节点
nextnode = nextnode.getNext();
}
//每个头项表节点重复上述所有操作,递归
FPGrowth(newTransaction, newPostPattern);
}
}

/**
* 构建头项表,按递减排好序
* @return
*/
public List<TNode> buildHeaderTable(List<List<String>> transactions)
{
List<TNode> list = new ArrayList<TNode>();
Map<String,TNode> nodesmap = new HashMap<String,TNode>();
//为每一个item构建一个节点
for(List<String> lines : transactions)
{
for(int i = 0; i < lines.size(); ++i)
{
String itemName = lines.get(i);
if(!nodesmap.keySet().contains(itemName)) //为item构建节点
{
nodesmap.put(itemName, new TNode(itemName));
}
else //若已经构建过该节点,出现次数加1
{
nodesmap.get(itemName).increaseCount(1);
}
}
}
//筛选满足最小支持度的item节点
for(TNode item : nodesmap.values())
{
if(item.getCount() >= minSupport)
{
list.add(item);
&nbs
23ff8
p; }
}
//按count值从高到低排序
Collections.sort(list);
return list;
}

/**
* 构建FR-tree
* @param headertable 头项表
* @return
*/
public TNode bulidFPTree(List<TNode> headertable, List<List<String>> transactions)
{
TNode rootNode = new TNode();
for(List<String> items : transactions)
{
LinkedList<String> itemsDesc = sortItemsByDesc(items, headertable);
//寻找添加itemsDesc为子树的父节点
TNode subtreeRoot = rootNode;
if(subtreeRoot.getChildren().size() != 0)
{
TNode tempNode = subtreeRoot.findChildren(itemsDesc.peek());
while(!itemsDesc.isEmpty() && tempNode != null)
{
tempNode.increaseCount(1);
subtreeRoot = tempNode;
itemsDesc.poll();
tempNode = subtreeRoot.findChildren(itemsDesc.peek());
}
}
//将itemsDesc中剩余的节点加入作为subtreeRoot的子树
addSubTree(headertable, subtreeRoot, itemsDesc);
}
return rootNode;
}

/**
* @param headertable 头项表
* @param subtreeRoot 子树父节点
* @param itemsDesc 被添加的子树
*/
public void addSubTree(List<TNode> headertable, TNode subtreeRoot, LinkedList<String> itemsDesc)
{
if(itemsDesc.size() > 0)
{
TNode thisnode = new TNode(itemsDesc.pop());//构建新节点
subtreeRoot.getChildren().add(thisnode);
thisnode.setParent(subtreeRoot);
//将thisnode加入头项表对应节点链表的末尾
for(TNode node : headertable)
{
if(node.getItemName().equals(thisnode.getItemName()))
{
TNode lastNode = node;
while(lastNode.getNext() != null)
{
lastNode = lastNode.getNext();
}
lastNode.setNext(thisnode);
}
}
subtreeRoot = thisnode;//更新父节点为当前节点
//递归添加剩余的items
addSubTree(headertable, subtreeRoot, itemsDesc);
}
}

//将items按count从高到低排序
public LinkedList<String> sortItemsByDesc(List<String> items, List<TNode> headertable)
{
LinkedList<String> itemsDesc = new LinkedList<String>();
for(TNode node : headertable)
{
if(items.contains(node.getItemName()))
{
itemsDesc.add(node.getItemName());
}
}
return itemsDesc;
}

public static void main(String[] args)
{
FPTree fptree = new FPTree(4);
List<List<String>> transactions = fptree.loadTransaction(”C:\\Users\\shimin\\Desktop\\新建文件夹\\wordcounts.txt”);
fptree.FPGrowth(transactions, null);
}

/**
* fp-tree节点的数据结构(一个item表示一个节点)
* @author shimin
*
*/
public class TNode implements Comparable<TNode>
{
private String itemName; //项目名
private int count; //事务数据库中出现次数
private TNode parent; //父节点
private List<TNode> children; //子节点
private TNode next;//下一个同名节点

public TNode()
{
this.children = new ArrayList<TNode>();
}
public TNode(String name)
{
this.itemName = name;
this.count = 1;
this.children = new ArrayList<TNode>();
}
public TNode findChildren(String childName)
{
for(TNode node : this.getChildren())
{
if(node.getItemName().equals(childName))
{
return node;
}
}
return null;
}
public TNode getNext()
{
return next;
}
public TNode getParent()
{
return parent;
}
public void setNext(TNode next)
{
this.next = next;
}
public void increaseCount(int num)
{
count += num;
}
public int getCount()
{
return count;
}
public String getItemName()
{
return itemName;
}
public List<TNode> getChildren()
{
return children;
}
public void setParent(TNode parent)
{
this.parent = parent;
}
@Override
public int compareTo(TNode o)
{
return o.getCount() - this.getCount();
}
}
}</span>
<span style="font-size:14px;">package org.min.ml;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
* FP-tree算法:用于挖掘出事务数据库中的频繁项集,该方法是对APriori算法的改进
*
* @author ShiMin
* @date   2015/10/17
*/
public class FPTree
{
private int minSupport;//最小支持度

public FPTree(int support)
{
this.minSupport = support;
}

/**
* 加载事务数据库
* @param file 文件路径名  文件中每行item由空格分隔
*/
public List<List<String>> loadTransaction(String file)
{
List<List<String>> transactions = new ArrayList<List<String>>();
try
{
BufferedReader br = new BufferedReader(new FileReader(new File(file)));
String line = "";
while((line = br.readLine()) != null)
{
transactions.add(Arrays.asList(line.split(" ")));
}
} catch (Exception e)
{
e.printStackTrace();
}
return transactions;
}

public void FPGrowth(List<List<String>> transactions, List<String> postPattern)
{
//构建头项表
List<TNode> headerTable = buildHeaderTable(transactions);
//构建FP树
TNode tree = bulidFPTree(headerTable, transactions);
//当树为空时退出
if (tree.getChildren()== null || tree.getChildren().size() == 0)
{
return;
}
//输出频繁项集
if(postPattern!=null)
{
for (TNode head : headerTable)
{
System.out.print(head.getCount() + " " + head.getItemName());
for (String item : postPattern)
{
System.out.print(" " + item);
}
System.out.println();
}
}
//遍历每一个头项表节点
for(TNode head : headerTable)
{
List<String> newPostPattern = new LinkedList<String>();
newPostPattern.add(head.getItemName());//添加本次模式基
//加上将前面累积的前缀模式基
if (postPattern != null)
{
newPostPattern.addAll(postPattern);
}
//定义新的事务数据库
List<List<String>> newTransaction = new LinkedList<List<String>>();
TNode nextnode = head.getNext();
//去除名称为head.getItemName()的模式基,构造新的事务数据库
while(nextnode != null)
{
int count = nextnode.getCount();
List<String> parentNodes = new ArrayList<String>();//nextnode节点的所有祖先节点
TNode parent = nextnode.getParent();
while(parent.getItemName() != null)
{
parentNodes.add(parent.getItemName());
parent = parent.getParent();
}
//向事务数据库中重复添加count次parentNodes
while((count--) > 0)
{
newTransaction.add(parentNodes);//添加模式基的前缀 ,因此最终的频繁项为:  parentNodes -> newPostPattern
}
//下一个同名节点
nextnode = nextnode.getNext();
}
//每个头项表节点重复上述所有操作,递归
FPGrowth(newTransaction, newPostPattern);
}
}

/**
* 构建头项表,按递减排好序
* @return
*/
public List<TNode> buildHeaderTable(List<List<String>> transactions)
{
List<TNode> list = new ArrayList<TNode>();
Map<String,TNode> nodesmap = new HashMap<String,TNode>();
//为每一个item构建一个节点
for(List<String> lines : transactions)
{
for(int i = 0; i < lines.size(); ++i)
{
String itemName = lines.get(i);
if(!nodesmap.keySet().contains(itemName)) //为item构建节点
{
nodesmap.put(itemName, new TNode(itemName));
}
else //若已经构建过该节点,出现次数加1
{
nodesmap.get(itemName).increaseCount(1);
}
}
}
//筛选满足最小支持度的item节点
for(TNode item : nodesmap.values())
{
if(item.getCount() >= minSupport)
{
list.add(item);
}
}
//按count值从高到低排序
Collections.sort(list);
return list;
}

/**
* 构建FR-tree
* @param headertable 头项表
* @return
*/
public TNode bulidFPTree(List<TNode> headertable, List<List<String>> transactions)
{
TNode rootNode = new TNode();
for(List<String> items : transactions)
{
LinkedList<String> itemsDesc = sortItemsByDesc(items, headertable);
//寻找添加itemsDesc为子树的父节点
TNode subtreeRoot = rootNode;
if(subtreeRoot.getChildren().size() != 0)
{
TNode tempNode = subtreeRoot.findChildren(itemsDesc.peek());
while(!itemsDesc.isEmpty() && tempNode != null)
{
tempNode.increaseCount(1);
subtreeRoot = tempNode;
itemsDesc.poll();
tempNode = subtreeRoot.findChildren(itemsDesc.peek());
}
}
//将itemsDesc中剩余的节点加入作为subtreeRoot的子树
addSubTree(headertable, subtreeRoot, itemsDesc);
}
return rootNode;
}

/**
* @param headertable 头项表
* @param subtreeRoot 子树父节点
* @param itemsDesc 被添加的子树
*/
public void addSubTree(List<TNode> headertable, TNode subtreeRoot, LinkedList<String> itemsDesc)
{
if(itemsDesc.size() > 0)
{
TNode thisnode = new TNode(itemsDesc.pop());//构建新节点
subtreeRoot.getChildren().add(thisnode);
thisnode.setParent(subtreeRoot);
//将thisnode加入头项表对应节点链表的末尾
for(TNode node : headertable)
{
if(node.getItemName().equals(thisnode.getItemName()))
{
TNode lastNode = node;
while(lastNode.getNext() != null)
{
lastNode = lastNode.getNext();
}
lastNode.setNext(thisnode);
}
}
subtreeRoot = thisnode;//更新父节点为当前节点
//递归添加剩余的items
addSubTree(headertable, subtreeRoot, itemsDesc);
}
}

//将items按count从高到低排序
public LinkedList<String> sortItemsByDesc(List<String> items, List<TNode> headertable)
{
LinkedList<String> itemsDesc = new LinkedList<String>();
for(TNode node : headertable)
{
if(items.contains(node.getItemName()))
{
itemsDesc.add(node.getItemName());
}
}
return itemsDesc;
}

public static void main(String[] args)
{
FPTree fptree = new FPTree(4);
List<List<String>> transactions = fptree.loadTransaction("C:\\Users\\shimin\\Desktop\\新建文件夹\\wordcounts.txt");
fptree.FPGrowth(transactions, null);
}

/**
* fp-tree节点的数据结构(一个item表示一个节点)
* @author shimin
*
*/
public class TNode implements Comparable<TNode>
{
private String itemName; //项目名
private int count; //事务数据库中出现次数
private TNode parent; //父节点
private List<TNode> children; //子节点
private TNode next;//下一个同名节点

public TNode()
{
this.children = new ArrayList<TNode>();
}
public TNode(String name)
{
this.itemName = name;
this.count = 1;
this.children = new ArrayList<TNode>();
}
public TNode findChildren(String childName)
{
for(TNode node : this.getChildren())
{
if(node.getItemName().equals(childName))
{
return node;
}
}
return null;
}
public TNode getNext()
{
return next;
}
public TNode getParent()
{
return parent;
}
public void setNext(TNode next)
{
this.next = next;
}
public void increaseCount(int num)
{
count += num;
}
public int getCount()
{
return count;
}
public String getItemName()
{
return itemName;
}
public List<TNode> getChildren()
{
return children;
}
public void setParent(TNode parent)
{
this.parent = parent;
}
@Override
public int compareTo(TNode o)
{
return o.getCount() - this.getCount();
}
}
}</span>


3 FP-Growth在spark集群上java实现源码

[java] view plain copy print?<span style=“font-size:14px;”>package org.min.fpgrowth;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;
/**
* @author ShiMin
* @date 2015/10/19
* @description FPGrowth algorithm runs on spark in java.
*/
public class FPTree
{
public static int SUPPORT_DEGREE = 4;//the support of FPGrowth algorithm
public static String SEPARATOR = “ ”;//line separator

public static void main(String[] args)
{
Logger.getLogger(”org.apache.spark”).setLevel(Level.OFF);
args = new String[]{“hdfs://master:9000/data/input/wordcounts.txt”, “hdfs://master:9000/data/output”};
if(args.length != 2)
{
System.err.println(”USage:<Datapath> <Output>”);
System.exit(1);
}

SparkConf sparkConf = new SparkConf().setAppName(“frequent parttern growth”).setMaster(“local[4]”);
JavaSparkContext ctx = new JavaSparkContext(sparkConf);

//load the transactions data.
JavaRDD<String> lines = ctx.textFile(args[0], 1)
//remove the ID of transaction.
.map(new Function<String, String>()
{
private static final long serialVersionUID = -692074104680411557L;

public String call(String arg0) throws Exception
{
return arg0.substring(arg0.indexOf(“ ”) + 1).trim();
}
});

JavaPairRDD<String, Integer> transactions = constructTransactions(lines);
//run FPGrowth algorithm
FPGrowth(transactions, null, ctx);
//close sparkContext
ctx.close();
}

public static JavaPairRDD<String, Integer> constructTransactions(JavaRDD<String> lines)
{
JavaPairRDD<String, Integer> transactions = lines
//convert lines to <key,value>(or <line,1>) pairs.
.mapToPair(new PairFunction<String, String, Integer>()
{
private static final long serialVersionUID = 5915574436834522347L;

public Tuple2<String, Integer> call(String arg0) throws Exception
{
return new Tuple2<String, Integer>(arg0, 1);
}
})
//combine the same translations.
.reduceByKey(new Function2<Integer, Integer, Integer>()
{
private static final long serialVersionUID = -8075378913994858824L;

public Integer call(Integer arg0, Integer arg1) throws Exception
{
return arg0 + arg1;
}
});
return transactions;
}
/**
* @param transactions
* @param postPattern
*/
public static void FPGrowth(JavaPairRDD<String, Integer> transactions, final List<String> postPattern, JavaSparkContext ctx)
{
//construct headTable
JavaRDD<TNode> headTable = bulidHeadTable(transactions);
List<TNode> headlist = headTable.collect();
//construct FPTree
TNode tree = bulidFPTree(headlist, transactions);
//when the FPTree is empty, then exit the excursion
if(tree.getChildren() == null || tree.getChildren().size() == 0)
{
return;
}
//output the frequent itemSet
if(postPattern != null)
{
for(TNode head : headlist)
{
System.out.print(head.getCount() + ” ” + head.getItemName());
for(String item : postPattern)
{
System.out.print(” ” + item);
}
System.out.println();
}
// headTable.foreach(new VoidFunction<TNode>()
// {
// public void call(TNode head) throws Exception
// {
// System.out.println(head.getCount() + ” ” + head.getItemName());
// for(String item : postPattern)
// {
// System.out.println(“ ” + item);
// }
// }
// });
}
//traverse each item of headTable
for(TNode head : headlist)
{
List<String> newPostPattern = new ArrayList<String>();
newPostPattern.add(head.getItemName());
if(postPattern != null)
{
newPostPattern.addAll(postPattern);
}
//create new transactions
List<String> newTransactionsList = new ArrayList<String>();
TNode nextNode = head.getNext();
while(nextNode != null)
{
int count = head.getCount();
TNode parent = nextNode.getParent();
String tlines = ”“;
while(parent.getItemName() != null)
{
tlines += parent.getItemName() + ” ”;
parent = parent.getParent();
}
while((count–) > 0 && !tlines.equals(“”))
{
newTransactionsList.add(tlines);
}
nextNode = nextNode.getNext();
}
JavaPairRDD<String, Integer> newTransactions = constructTransactions(ctx.parallelize(newTransactionsList));
FPGrowth(newTransactions, newPostPattern, ctx);
}
}

/**
* construct FPTree
* @return the root of FPTree
*/
public static TNode bulidFPTree(List<TNode> headTable, JavaPairRDD<String, Integer> transactions)
{
//create the root node of FPTree
final TNode rootNode = new TNode();

final List<TNode> headItems = headTable;
//convert to transactions which ordered by count DESC and items satisfy the minimum support_degree
JavaPairRDD<LinkedList<String>, Integer> transactionsDesc = transactions.mapToPair(new PairFunction<Tuple2<String,Integer>, LinkedList<String>, Integer>()
{
private static final long serialVersionUID = 4787405828748201473L;

public Tuple2<LinkedList<String>, Integer> call(Tuple2<String, Integer> t)
throws Exception
{
LinkedList<String> descItems = new LinkedList<String>();
List<String> items = Arrays.asList(t._1.split(SEPARATOR));
for(TNode node : headItems)
{
String headName = node.getItemName();
if(items.contains(headName))
{
descItems.add(headName);
}
}
return new Tuple2<LinkedList<String>, Integer>(descItems, t._2);
}
})
.filter(new Function<Tuple2<LinkedList<String>,Integer>, Boolean>()
{
private static final long serialVersionUID = -8157084572151575538L;

public Boolean call(Tuple2<LinkedList<String>, Integer> v1) throws Exception
{
return v1._1.size() > 0;
}
});
List<Tuple2<LinkedList<String>, Integer>> lines = transactionsDesc.collect();
//add each transaction to FPTree
for(Tuple2<LinkedList<String>, Integer> t : lines)
{
LinkedList<String> itemsDesc = t._1;//items to be added to FPTree
int count = t._2;//how many times itemsDesc to be added to FPTree
//find out the root node which add List<String> as subtree
TNode subtreeRoot = rootNode;
if(subtreeRoot.getChildren().size() != 0)
{
TNode tempNode = subtreeRoot.findChildren(itemsDesc.peek());
while(!itemsDesc.isEmpty() && tempNode != null)
{
tempNode.countIncrement(count);
subtreeRoot = tempNode;
itemsDesc.poll();
tempNode = subtreeRoot.findChildren(itemsDesc.peek());
}
}
//add the left items of List<String> to FPTree
addSubTree(headItems, subtreeRoot, itemsDesc, count);
}

// transactionsDesc.foreach(new VoidFunction<Tuple2<LinkedList<String>,Integer>>()
// {
// private static final long serialVersionUID = 8054620367976985036L;
//
// public void call(Tuple2<LinkedList<String>, Integer> t) throws Exception
// {
// LinkedList<String> itemsDesc = t._1;//items to be added to FPTree
// int count = t._2;//how many times itemsDesc to be added to FPTree
// //find out the root node which add List<String> as subtree
// TNode subtreeRoot = rootNode;
// if(subtreeRoot.getChildren().size() != 0)
// {
// TNode tempNode = subtreeRoot.findChildren(itemsDesc.peek());
// while(!itemsDesc.isEmpty() && tempNode != null)
// {
// tempNode.countIncrement(count * 2);
// subtreeRoot = tempNode;
// itemsDesc.poll();
// tempNode = subtreeRoot.findChildren(itemsDesc.peek());
// }
// }
// //add the left items of List<String> to FPTree
// addSubTree(headItems, subtreeRoot, itemsDesc, count);
// }
// });
return rootNode;
}
/**
*
* @param headTable
* @param subtreeRoot
* @param itemsDesc
* @param count
*/
public static void addSubTree(List<TNode> headItems, TNode subtreeRoot, LinkedList<String> itemsDesc, int count)
{
if(itemsDesc.size() > 0)
{
final TNode thisNode = new TNode(itemsDesc.pop(), count);//construct a new node
subtreeRoot.getChildren().add(thisNode);
thisNode.setParent(subtreeRoot);
//add thisNode to the relevant headTable node list
for(TNode t : headItems)
{
if(t.getItemName().equals(thisNode.getItemName()))
{
TNode lastNode = t;
while(lastNode.getNext() != null)
{
lastNode = lastNode.getNext();
}
lastNode.setNext(thisNode);
}
}
subtreeRoot = thisNode;//update thisNode as the current subtreeRoot
//add the left items in itemsDesc recursively
addSubTree(headItems, subtreeRoot, itemsDesc, count);
}
}
/**
* construct the headTable of the format <count, itemName> descended.
* @param transactions
* @return headTable
*/
public static JavaRDD<TNode> bulidHeadTable(JavaPairRDD<String, Integer> transactions)
{
JavaRDD<TNode> headtable = transactions.flatMapToPair(new PairFlatMapFunction<Tuple2<String,Integer>, String, Integer>()
{
private static final long serialVersionUID = -3654849959547730063L;

public Iterable<Tuple2<String, Integer>> call(Tuple2<String, Integer> arg0)
throws Exception
{
List<Tuple2<String, Integer>> t2list = new ArrayList<Tuple2<String, Integer>>();
String[] items = arg0._1.split(SEPARATOR);
int count = arg0._2;
for(String item : items)
{
t2list.add(new Tuple2<String, Integer>(item, count));
}
return t2list;
}
})
//combine the same items.
.reduceByKey(new Function2<Integer, Integer, Integer>()
{
private static final long serialVersionUID = 629605042999702574L;

public Integer call(Integer arg0, Integer arg1) throws Exception
{
return arg0 + arg1;
}
})
//convert <ietm,integer> to <integr,item> format.
.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>()
{
private static final long serialVersionUID = -7017909569876993192L;

public Tuple2<Integer, String> call(Tuple2<String, Integer> t)
throws Exception
{
return new Tuple2<Integer, String>(t._2, t._1);
}
})
//filter out items which satisfies the minimum support_degree.
.filter(new Function<Tuple2<Integer, String>, Boolean>()
{
private static final long serialVersionUID = -3643383589739281939L;

public Boolean call(Tuple2<Integer, String> v1) throws Exception
{
return v1._1 >= SUPPORT_DEGREE;
}
})
//sort items in descent.
.sortByKey(false)
//convert transactions to TNode.
.map(new Function<Tuple2<Integer,String>, TNode>()
{
private static final long serialVersionUID = 16629827688034851L;

public TNode call(Tuple2<Integer, String> v1) throws Exception
{
return new TNode(v1._2, v1._1);
}
});
return headtable;
}
}
</span>
<span style="font-size:14px;">package org.min.fpgrowth;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;
/**
* @author ShiMin
* @date   2015/10/19
* @description FPGrowth algorithm runs on spark in java.
*/
public class FPTree
{
public static int SUPPORT_DEGREE = 4;//the support of FPGrowth algorithm
public static String SEPARATOR = " ";//line separator

public static void main(String[] args)
{
Logger.getLogger("org.apache.spark").setLevel(Level.OFF);
args = new String[]{"hdfs://master:9000/data/input/wordcounts.txt", "hdfs://master:9000/data/output"};
if(args.length != 2)
{
System.err.println("USage:<Datapath> <Output>");
System.exit(1);
}

SparkConf sparkConf = new SparkConf().setAppName("frequent parttern growth").setMaster("local[4]");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);

//load the transactions data.
JavaRDD<String> lines = ctx.textFile(args[0], 1)
//remove the ID of transaction.
.map(new Function<String, String>()
{
private static final long serialVersionUID = -692074104680411557L;

public String call(String arg0) throws Exception
{
return arg0.substring(arg0.indexOf(" ") + 1).trim();
}
});

JavaPairRDD<String, Integer> transactions = constructTransactions(lines);
//run FPGrowth algorithm
FPGrowth(transactions, null, ctx);
//close sparkContext
ctx.close();
}

public static JavaPairRDD<String, Integer> constructTransactions(JavaRDD<String> lines)
{
JavaPairRDD<String, Integer> transactions = lines
//convert lines to <key,value>(or <line,1>) pairs.
.mapToPair(new PairFunction<String, String, Integer>()
{
private static final long serialVersionUID = 5915574436834522347L;

public Tuple2<String, Integer> call(String arg0) throws Exception
{
return new Tuple2<String, Integer>(arg0, 1);
}
})
//combine the same translations.
.reduceByKey(new Function2<Integer, Integer, Integer>()
{
private static final long serialVersionUID = -8075378913994858824L;

public Integer call(Integer arg0, Integer arg1) throws Exception
{
return arg0 + arg1;
}
});
return transactions;
}
/**
* @param transactions
* @param postPattern
*/
public static void FPGrowth(JavaPairRDD<String, Integer> transactions, final List<String> postPattern, JavaSparkContext ctx)
{
//construct headTable
JavaRDD<TNode> headTable = bulidHeadTable(transactions);
List<TNode> headlist = headTable.collect();
//construct FPTree
TNode tree = bulidFPTree(headlist, transactions);
//when the FPTree is empty, then exit the excursion
if(tree.getChildren() == null || tree.getChildren().size() == 0)
{
return;
}
//output the frequent itemSet
if(postPattern != null)
{
for(TNode head : headlist)
{
System.out.print(head.getCount() + " " + head.getItemName());
for(String item : postPattern)
{
System.out.print(" " + item);
}
System.out.println();
}
//          headTable.foreach(new VoidFunction<TNode>()
//          {
//              public void call(TNode head) throws Exception
//              {
//                  System.out.println(head.getCount() + " " + head.getItemName());
//                  for(String item : postPattern)
//                  {
//                      System.out.println(" " + item);
//                  }
//              }
//          });
}
//traverse each item of headTable
for(TNode head : headlist)
{
List<String> newPostPattern = new ArrayList<String>();
newPostPattern.add(head.getItemName());
if(postPattern != null)
{
newPostPattern.addAll(postPattern);
}
//create new transactions
List<String> newTransactionsList = new ArrayList<String>();
TNode nextNode = head.getNext();
while(nextNode != null)
{
int count = head.getCount();
TNode parent = nextNode.getParent();
String tlines = "";
while(parent.getItemName() != null)
{
tlines += parent.getItemName() + " ";
parent = parent.getParent();
}
while((count--) > 0 && !tlines.equals(""))
{
newTransactionsList.add(tlines);
}
nextNode = nextNode.getNext();
}
JavaPairRDD<String, Integer> newTransactions = constructTransactions(ctx.parallelize(newTransactionsList));
FPGrowth(newTransactions, newPostPattern, ctx);
}
}

/**
* construct FPTree
* @return the root of FPTree
*/
public static TNode bulidFPTree(List<TNode> headTable, JavaPairRDD<String, Integer> transactions)
{
//create the root node of FPTree
final TNode rootNode = new TNode();

final List<TNode> headItems = headTable;
//convert to transactions which ordered by count DESC and items satisfy the minimum support_degree
JavaPairRDD<LinkedList<String>, Integer> transactionsDesc = transactions.mapToPair(new PairFunction<Tuple2<String,Integer>, LinkedList<String>, Integer>()
{
private static final long serialVersionUID = 4787405828748201473L;

public Tuple2<LinkedList<String>, Integer> call(Tuple2<String, Integer> t)
throws Exception
{
LinkedList<String> descItems = new LinkedList<String>();
List<String> items = Arrays.asList(t._1.split(SEPARATOR));
for(TNode node : headItems)
{
String headName = node.getItemName();
if(items.contains(headName))
{
descItems.add(headName);
}
}
return new Tuple2<LinkedList<String>, Integer>(descItems, t._2);
}
})
.filter(new Function<Tuple2<LinkedList<String>,Integer>, Boolean>()
{
private static final long serialVersionUID = -8157084572151575538L;

public Boolean call(Tuple2<LinkedList<String>, Integer> v1) throws Exception
{
return v1._1.size() > 0;
}
});
List<Tuple2<LinkedList<String>, Integer>> lines = transactionsDesc.collect();
//add each transaction to FPTree
for(Tuple2<LinkedList<String>, Integer> t : lines)
{
LinkedList<String> itemsDesc = t._1;//items to be added to FPTree
int count = t._2;//how many times itemsDesc to be added to FPTree
//find out the root node which add List<String> as subtree
TNode subtreeRoot = rootNode;
if(subtreeRoot.getChildren().size() != 0)
{
TNode tempNode = subtreeRoot.findChildren(itemsDesc.peek());
while(!itemsDesc.isEmpty() && tempNode != null)
{
tempNode.countIncrement(count);
subtreeRoot = tempNode;
itemsDesc.poll();
tempNode = subtreeRoot.findChildren(itemsDesc.peek());
}
}
//add the left items of List<String> to FPTree
addSubTree(headItems, subtreeRoot, itemsDesc, count);
}

//      transactionsDesc.foreach(new VoidFunction<Tuple2<LinkedList<String>,Integer>>()
//      {
//          private static final long serialVersionUID = 8054620367976985036L;
//
//          public void call(Tuple2<LinkedList<String>, Integer> t) throws Exception
//          {
//              LinkedList<String> itemsDesc = t._1;//items to be added to FPTree
//              int count = t._2;//how many times itemsDesc to be added to FPTree
//              //find out the root node which add List<String> as subtree
//              TNode subtreeRoot = rootNode;
//              if(subtreeRoot.getChildren().size() != 0)
//              {
//                  TNode tempNode = subtreeRoot.findChildren(itemsDesc.peek());
//                  while(!itemsDesc.isEmpty() && tempNode != null)
//                  {
//                      tempNode.countIncrement(count * 2);
//                      subtreeRoot = tempNode;
//                      itemsDesc.poll();
//                      tempNode = subtreeRoot.findChildren(itemsDesc.peek());
//                  }
//              }
//              //add the left items of List<String> to FPTree
//              addSubTree(headItems, subtreeRoot, itemsDesc, count);
//          }
//      });
return rootNode;
}
/**
*
* @param headTable
* @param subtreeRoot
* @param itemsDesc
* @param count
*/
public static void addSubTree(List<TNode> headItems, TNode subtreeRoot, LinkedList<String> itemsDesc, int count)
{
if(itemsDesc.size() > 0)
{
final TNode thisNode = new TNode(itemsDesc.pop(), count);//construct a new node
subtreeRoot.getChildren().add(thisNode);
thisNode.setParent(subtreeRoot);
//add thisNode to the relevant headTable node list
for(TNode t : headItems)
{
if(t.getItemName().equals(thisNode.getItemName()))
{
TNode lastNode = t;
while(lastNode.getNext() != null)
{
lastNode = lastNode.getNext();
}
lastNode.setNext(thisNode);
}
}
subtreeRoot = thisNode;//update thisNode as the current subtreeRoot
//add the left items in itemsDesc recursively
addSubTree(headItems, subtreeRoot, itemsDesc, count);
}
}
/**
* construct the headTable of the format <count, itemName> descended.
* @param transactions
* @return headTable
*/
public static JavaRDD<TNode> bulidHeadTable(JavaPairRDD<String, Integer> transactions)
{
JavaRDD<TNode> headtable = transactions.flatMapToPair(new PairFlatMapFunction<Tuple2<String,Integer>, String, Integer>()
{
private static final long serialVersionUID = -3654849959547730063L;

public Iterable<Tuple2<String, Integer>> call(Tuple2<String, Integer> arg0)
throws Exception
{
List<Tuple2<String, Integer>> t2list = new ArrayList<Tuple2<String, Integer>>();
String[] items = arg0._1.split(SEPARATOR);
int count = arg0._2;
for(String item : items)
{
t2list.add(new Tuple2<String, Integer>(item, count));
}
return t2list;
}
})
//combine the same items.
.reduceByKey(new Function2<Integer, Integer, Integer>()
{
private static final long serialVersionUID = 629605042999702574L;

public Integer call(Integer arg0, Integer arg1) throws Exception
{
return arg0 + arg1;
}
})
//convert <ietm,integer> to <integr,item> format.
.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>()
{
private static final long serialVersionUID = -7017909569876993192L;

public Tuple2<Integer, String> call(Tuple2<String, Integer> t)
throws Exception
{
return new Tuple2<Integer, String>(t._2, t._1);
}
})
//filter out items which satisfies the minimum support_degree.
.filter(new Function<Tuple2<Integer, String>, Boolean>()
{
private static final long serialVersionUID = -3643383589739281939L;

public Boolean call(Tuple2<Integer, String> v1) throws Exception
{
return v1._1 >= SUPPORT_DEGREE;
}
})
//sort items in descent.
.sortByKey(false)
//convert transactions to TNode.
.map(new Function<Tuple2<Integer,String>, TNode>()
{
private static final long serialVersionUID = 16629827688034851L;

public TNode call(Tuple2<Integer, String> v1) throws Exception
{
return new TNode(v1._2, v1._1);
}
});
return headtable;
}
}
</span>

4 运行结果

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐