您的位置:首页 > 其它

Spark-Mllib(一)数据类型

2016-03-28 23:01 225 查看
一.本地向量

  有如下几个类:Vector(基类),DenseVector,SparseVector,Vectors(工厂方法,推荐用)

工厂模式是:定义一个用于创建对象的接口,让子类决定实例化哪一个类,使一个类的实例化延迟到子类

importorg.apache.spark.mllib.linalg.{Vectors,Vector}#linalgisshortforlinearalgebra

valv1=Vectors.dense(1.0,2.0,3.0)#定义1

valv2=Vectors.sparse(3,(1,2),(10,100))#长度为3,第1,2个位置的值为10和100

valv3=Vectors.sparse(3,Seq((1,10),(2,100)))#结果同上

二.带有标签的向量

  主要应用在有监督学习中,二分类(0,1),多分类(0,1,2,3,....)

importorg.apache.spark.mllib.Regression.LabeledPoint;

valvl1=LabeledPoint(1,Vectors.dense(1,2,3,4))

valvl2=LabeledPoint(0,Vectors.sparse(3,(1,2),(10,100)))

三.读取LIBSVM格式的数据

<label><index1>:<value1><index2>:<value2>...
其中<label>是训练数据集的目标值,对于分类,它是标识某类的整数(支持多个类);对于回归,是任意实数。<index>是以1开始的整数,可以是不连续的;<value>;为实数,也就是我们常说的自变量。检验数据文件中的label只用于计算准确度或误差,如果它是未知的,只需用一个数填写这一栏,也可以空着不填.
例如:

01:103:19
11:183:204:178

importorg.apache.spark.mllib.regression.LabeledPoint
importorg.apache.spark.mllib.util.MLUtils
importorg.apache.spark.rdd.RDD

valsvmfile=MLUtils.loadLibSVMFile(sc,"svmdata2")

四.创建本地矩阵
  本地矩阵是行列号索引,值为double类型的数据,存储在单独的机器上.支持稠密矩阵和稀疏矩阵。
与Vector和Vectors的关系类似,Matrix有对应的Matrices
  对于稀疏矩阵的压缩方法,具体可以参考http://www.tuicool.com/articles/A3emmqi,spark默认的为CSC格式的压缩

importorg.apache.spark.mllib.linalg.{Matrix,Matrices}
valm1=Matrices.dense(3,2,Array(1,2,3,4,5,6))
valm2=Matrices.sparse(3,2,Array(0,1,3),Array(0,2,1),Array(9,6,8))
参考csc压缩方法,m2 手工算的结果,应该是
(0,0)9
(2,0)6
(1,1)8
与spark计算的有出入。

五.分布式矩阵
  选择一个正确的形式去存储大的分布式矩阵非常重要,将分布式矩阵转化为不同的格式需要全局的shuffle,代价很大。目前有三种类型的分布式矩阵,RowMatrix,IndexedRowMatrix,CoordinateMatrix.
  什么是shuffle呢?参考http://dongxicheng.org/framework-on-yarn/apache-spark-shuffle-details/
通常shuffle分为两部分,map阶段的数据准备以及Reduce阶段的数据拷贝,Map阶段需要根据Reduce阶段的Task数量决定每个MapTask输出的数据分片数目

RowMatrix是没有行索引,例如一些特征向量,没一行是一个本地向量。
IndexedRowMatrix,有行索引,可以用于识别行和执行链接操作
CoordinateMatrix存成COO形式

构造RowMatrix
importorg.apache.spark.mllib.linalg.{Vector,Vectors}
importorg.apache.spark.mllib.linalg.distributed.RowMatrix
valdata=sc.parallelize(1to9,3)#RDD形式
valrows=data.map(x=>Vectors.dense(x))
valm1=newRowMatrix(rows,3,3)
m1.numRows
m1.numCols

构造IndexedRowMatrix
importorg.apache.spark.mllib.linalg.distributed.{IndexedRow,IndexedRowMatrix,RowMatrix}
valdata1=sc.parallelize(1to12,2)
valrows1=data1.map(x=>IndexedRow(2,Vectors.dense(x)))
valmat=newIndexedRowMatrix(rows1,3,4)
mat.numRows()
mat.numCols()

构造COO#对于稀疏矩阵比较有用,指定非空元素的行列以及value即可
importorg.apache.spark.mllib.linalg.distributed.{CoordinateMatrix,MatrixEntry}

valdata2=sc.parallelize(1to20,4)

valrows2=data1.map(x=>MatrixEntry(1,1,3))

valm2=newCoordinateMatrix(rows2,4,5)
#
valdata3=sc.textFile("coo").map(_.split('')).map(_.map(_.toDouble)).map(m=>(m(0).toLong,m(1).toLong,m(2))).map(x=>newMatrixEntry(x._1,x._2,x._3))
valm3=newCoordinateMatrix(data3,3,4)

#构造BlockMatrix
valm4=m3.toBlockMatrix()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: