您的位置:首页 > 其它

Machine Learning on Spark--Data Type

2016-02-29 15:04 323 查看

1. Local vector 本地向量

本地向量拥有长整型的integer-based,基于0指标的0-based,双精度double的值,存储在一台单独的本地机器上。MLlib支持两种类型的局部变量:稠密的dense和稀疏的sparse。

1.1 dense vector稠密向量

稠密向量是由双精度的数组支持,比如,(1.0,0.0,3.0),稠密向量显示为[1.0,0.0,3.0]

//导入向量包
import org.apache.spark.mllib.linalg.{Vector, Vectors}

// 创建一个稠密向量(1.0, 0.0, 3.0).
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)

//打印稠密变量的第3个值
println(dv(2))


1.2 sparse vector稀疏向量

稀疏变量是由两个平行的数组支持:由指标indices与值value组成.

比如,向量(1.0,0.0,3.0),稀疏向量表示为[3,[0,2],[1.0,3.0]]。3为向量的大小,[0,2]为向量的下标,[1.0,3.0]为向量的值。这表示下标为0的向量值为1,下标为2的向量值为3,没有表示出来的是下标为1的向量,默认为0.

另一种创建稀疏向量方法是(3, Seq((0, 1.0), (2, 3.0))),3仍然表示向量的大小,Seq((0, 1.0), (2, 3.0))中(0, 1.0)表示下标为0的向量值为1.0,(2, 3.0)表示下标为2的向量(第三个向量)的值为3.

稀疏向量往往应用于某个值出现较多的时候。

//导入向量包
import org.apache.spark.mllib.linalg.{Vector, Vectors}

//创建一个稀疏向量 (1.0, 0.0, 3.0) 方式一:采用数组方式
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))

// 创建一个稀疏向量 (1.0, 0.0, 3.0) 方式二:采用序列方式
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))

//打印稀疏变量的第3个值
println(sv1(2))


注:Scala imports scala.collection.immutable.Vector by default, so you have to import org.apache.spark.mllib.linalg.Vector explicitly to use MLlib’s Vector.

scala会默认输入不可变向量的包,所以在建立MLlib的向量之前要先输入rg.apache.spark.mllib.linalg.Vector包

2. Labled Point 标签向量

labled point相当于预测中的目标变量或应变量,在training中是给出样本的目标值,在test中是需要被预测的目标值。

labled point也是一个本地向量,常被适用于监督学习算法中。我们用双精度类型去存储label,所以labeled point可以被用于回归(连续型变量)和分类(类别型变量)中。对于二元分类,label可以是0(负)或是1(正);对于多元分类,label需是从0开始的一串类别指标(0,1,2…)

//导入包:可变向量包;回归中的标签向量包
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

//创建一个“正”的标签,对应一组稠密特征向量
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
//打印标记点的特征数据(内容数据)
println(pos.features)
//打印标记
println(pos.label)

// 创建一个“负”的标签,对应两组稠密特征向量
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
//打印标记点的特征数据(内容数据)
println(neg.features)
//打印标记
println(neg.label)


括号中的数组无论是稀疏还是稠密向量,表示的是这组观测值的特征feature,表示有一个实例,当出现这些feature的时候,目标变量被标记为1或0,在机器学习中,大量这样的观测值,可以用于预测,当出现某些特征时,目标变量便会属于哪一类型。

在实际中,sparse training data非常常用。MLlib支持读取以LIBSVM格式存储的训练实例。它是以文本形式存储一行一行的观测值,每个观测值由多个features和一个labeled point的组成。格式如下

label index1:value1 index2:value2 ...


MLUtils.loadLibSVMFile 可以将读取的训练实例以LIBSVM格式存储

//需要导入的包
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD

//读取本地某文件,并以LibSVM文件格式存储
val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")


3. Local Maxtix本地矩阵

本地矩阵拥有整数的行与列,双精度的值,存储在一个单独的本地机器上。MLlib支持dense稠密矩阵.矩阵的值是存储在单行的双精度数组里,

如下例子,创建一个3*2的稠密矩阵,是写在单行中的

//导入矩阵包
import org.apache.spark.mllib.linalg.{Matrix, Matrices}

// 创建一个稠密矩阵 ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
//打印矩阵
println(dm)


4. Distributed Matrix 分布式矩阵

A distributed matrix has long-typed row and column indices and double-typed values,

,分布地存储在一个或多个RDD中。选择一个适当的格式去存储海量的分布式矩阵非重要。将一个分布式的矩阵转换成其他格式需要全局的shuffle,非常耗资源。目前为止(spark1.6.0)有三种分布式矩阵。

4.1 BlockMtrix 分块矩阵

A BlockMatrix is a distributed matrix backed by an RDD of MatrixBlocks.MatrixBlocks是((Int,Int),Matrix)形式的元组,(Int,Int)是Blockd index索引,Matrix是rowsPerBlock x colsPerBlock.

即将原矩阵分成若干个块,每个块为原矩阵的子矩阵。

默认的创建大小是1024*1024,用户可以自定义block的大小。

import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}

val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
// Transform the CoordinateMatrix to a BlockMatrix
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()

// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
matA.validate()

// Calculate A^T A.
val ata = matA.transpose.multiply(matA)


4.2 RowMatrix 行矩阵

A RowMatrix is a row-oriented distributed matrix without meaningful row indices, backed by an RDD of its rows, where each row is a local vector. Since each row is represented by a local vector, the number of columns is limited by the integer range but it should be much smaller in practice.

行矩阵是以行作为基本方向的矩阵存储格式,列的作用相对较少,行矩阵每一行就相当于一个具有相同格式的向量数据,且每一行的向量内容都可以单独取出来进行操作。

每一行都是一个local vector.因此列的数目不能太大!

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.RowMatrix

//创建一个RDD向量
val rows: RDD[Vector] = sc.parrallelize((Array(1,2,3),Array(4,5,6),Array(7,8,9))).map(d=>vector.dense(d))

// 根据RDD向量,创建一个 RowMatrix .
val mat: RowMatrix = new RowMatrix(rows)

// 获得rowmatrix的行与列大小
val m = mat.numRows()
val n = mat.numCols()

println(m)
println(n)


4.3 IndexedRowMatrix 带有索引的行矩阵

An IndexedRowMatrix is similar to a RowMatrix but with meaningful row indices. It is backed by an RDD of indexed rows, so that each row is represented by its index (long-typed) and a local vector.

与行矩阵是类似的,只是添加了行的索引。

import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}

// 创建带索引的RDD[IndexedRow]
val rows: RDD[IndexedRow] = sc.parallelize((Array(1,2,3),
Array(4,5,6),
Array(7,8,9))))
.map(_.split(' ')
.map(_.toDouble))
.map(line=>Vectors.dense(line))
.map((vd)=>(vd.size,vd))

// 通过上面的RDD[IndexedRow],创建索引行矩阵实例
val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)

// 获得索引行矩阵的大小.
val m = mat.numRows()
val n = mat.numCols()

//打印类型
println(mat.getClass)

//打印内容数据
println(mat.rows.foreach(println))

// 去掉索引,转换成行矩阵.
val rowMat: RowMatrix = mat.toRowMatrix()


4.4 CoordinateMatrix 坐标矩阵

A CoordinateMatrix is a distributed matrix backed by an RDD of its entries. Each entry is a tuple of (i: Long, j: Long, value: Double), where i is the row index, j is the column index, and value is the entry value. A CoordinateMatrix should be used only when both dimensions of the matrix are huge and the matrix is very sparse.

坐标矩阵是一种带有坐标标记的矩阵,每一个具体数据都有一个坐标进行标识,格式是(x:Long,y:Long,value:Double),其中x,y为坐标,value为一个双精度的值

当行与列的维度都很大并且数据较为分散(含有某个值特别多)的时候应使用CoordinateMatrix

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

//创建坐标矩阵格式
val entries: RDD[MatrixEntry] =
sc.parallelize((Array(1,2,3),
Array(4,5,6),
Array(7,8,9))))
.map(_.split(' ')
.map(_.toDouble))
.map(vue=>(vue(0).toLong,vue(1).toLong,vue(2)))
.map(vue2=>new MatrixEntry(vue2_1,vue2_2,vue2_3))

// 创建一个 CoordinateMatrix 通过 RDD[MatrixEntry].
val mat: CoordinateMatrix = new CoordinateMatrix(entries)

// 获得它的大小
val m = mat.numRows()
val n = mat.numCols()

//打印坐标矩阵的内容数据
println(mat.entries.foreach(println)

// 将坐标矩阵转变为索引行矩阵(每一行都是稠密向量)
val indexedRowMatrix = mat.toIndexedRowMatrix()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: