您的位置:首页 > 其它

IgniteRDD学习笔记(二)IgniteContext和IgniteRDD

2015-10-26 15:00 246 查看
IgniteContext:

IgniteContext是spark-ignite的主要实例,而创建IgniteContext需要SparkContext。IgniteContext会确保server和client端的IgniteNode存在每个参与job的实例。

当创建Context的实例时,会传给Context构造函数一个boolean类型的值(默认为true),通常是用于部署安装。

当为false时,环境将运行在嵌入式模式并且会通过context构造函数在每个worker上启动serverNode。

当IgniteContext创建成功后,通过fromCache方法可以获得IgniteRDD。当创建RDD的时候Cache不是必须存在与集群中的,此时可以通过配置文件或模版文件创建Cache.

通过默认的Ignite configuration来创建IgniteCache:

val igniteContext = new IgniteContext[Integer, Integer](sparkContext,
() => new IgniteConfiguration())


通过模版配置文件来创建IgniteCache:

val igniteContext = new IgniteContext[Integer, Integer](sparkContext,
"examples/config/example-cache.xml")


IgniteRDD:
IgniteRDD是SparkRDD的一个抽象实例,以IgniteCache的形式存在。

IgniteRDD是可变的,所有的IgniteCache的更改(无论是因其他的RDD还是外部的cache的原因造成的改变)都能立即让RDD的使用者看到。

IgniteRDD利用分区缓存并且提供分区信息给spark的executor,IgniteRDD的分区数量等同于底层IgniteCache的数量。

通过
getPrefferredLocations
get方法可以得到计算所需要的数据位置。

从Ignite中读取内容:

因为缓存中已经有了IgniteRDD,所以不需要再从Ignite给spark加载数据,当IgniteRDD创建时,所有的RDD方法都可以直接使用。

例如,假设有一个名为"partitioned"的IgniteCache包含字符串,下面的代码会获取所有包含字符串"Ignite"的值

val cache = igniteContext.fromCache("partitioned")
val result = cache.filter(_._2.contains("Ignite")).collect()
给Ignite里写入内容:

因为IgniteCache是以键值对的方式存在,可以直接通过Spark
tuple RDD 和
savePairs
方法来保存数据。该方法会利用RDD分区并且以类似的方式存储值。

还可以通过
saveValues
方法只保存RDD的值到Cache中,IgniteRDD会给每个存储在Cache的值生成一个独一无二的键

例如,下面的代码是以一对的方式将1到10000的值存储到缓存"partitoned"中,十个并行任务:

val cacheRdd = igniteContext.fromCache("partitioned")

cacheRdd.savePairs(sparkContext.parallelize(1 to 10000, 10).map(i => (i, i)))
对IgniteCache执行SQL:

当cache中配置了启动索引子系统,可以通过标准SQL和SQL方法来进行查询。

例如,假设"partitoned"Cache配置了整数对的索引,下面的代码会获得(10,100)中所有的整数:

val cacheRdd = igniteContext.fromCache("partitioned")

val result = cacheRdd.sql(
"select _val from Integer where val > ? and val < ?", 10, 100)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: