SparkSqlForTest
2015-12-02 07:37
330 查看
package week4
/**
* Created by Administrator on 2015/3/31.
*/
import java.text.SimpleDateFormat
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
//import com.hadoop.mapreduce.LzoTextInputFormat
class SQLOnSpark{}
object SQLOnSpark{
case class Person(name:String,age:Int)
case class log_device_appu(client_id:String,chl:String,mod:String,region_id:String,packag:String,dur:String,client_time:String,ext1:String)
case class appstore_register(client_id:String,version:String,ip:String,region_id:String,isp_id:String,register_time:String,last_login_time:String,day:String)
case class app_list(id:String,packag:String,package_name:String,package_type:String,display_order:String,category:String,create_time:String)
case class d_channel(channel:String,name:String,typ:String,level:String,full_type:String,category:String,static_channel:String)
case class d_model(channel:String,model:String,alias:String,static_model:String)
def main (args: Array[String]) {
val sparkConf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(sparkConf)
// val conf = sc.hadoopConfiguration
// conf.set("io.compression.codecs","com.hadoop.compression.lzo.LzopCodec")
import org.apache.spark.sql.SQLContext
val sqlContext: SQLContext = new SQLContext(sc)
import sqlContext._
//log_device_appu
val gc=sc.textFile("appu").map(_.split("\t")).map(p=>log_device_appu(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7)))
// val gc=sc.textFile(args(0)).map(_.split("\t")).map(p=>appstore_register(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7)))
// val gc_lzo = sc.newAPIHadoopFile(args(0),classOf[com.hadoop.mapreduce.LzoTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text])
// val lzoRDD = gc_lzo.map(_.split("\t")).map(p=>appstore_register(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7)))
gc.registerAsTable("log_device_appu")
val gc_sql = sqlContext.sql("SELECT * FROM log_device_appu")
// gc_sql.map(t => "client_id: " + t(0)).collect().foreach(println)
gc_sql.map(t => "client_id: " + t(0)).coalesce(1,false).saveAsTextFile(args(1))
// d_app_list
val app=sc.textFile(args(2)).map(_.split("\t")).map(x=>app_list(x(0),x(1),x(2),x(3),x(4),x(5),x(6)))
app.registerAsTable("d_app_list")
val d_app_list = sqlContext.sql("SELECT * FROM d_app_list")
// d_app_list.map(t => "id: " + t(0)).collect().foreach(println)
// val join = sqlContext.sql("SELECT log_device_appu.client_id FROM log_device_appu join d_app_list where d_app_list.id=log_device_appu.client_id")
//d_channel
// val channel=sc.textFile("app_list").map(_.split("\t")).map(y=>d_channel(y(0),y(1),y(2),y(3),y(4),y(5),y(6)))
// channel.registerAsTable("d_channel")
// val channel_sql = sqlContext.sql("SELECT * FROM d_channel")
// channel_sql.map(t => "channel: " + t(0)).collect().foreach(println)
//d_model
// val model=sc.textFile("d_model").map(_.split("\t")).map(y=>d_model(y(0),y(1),y(2),y(3)))
// model.registerAsTable("d_model")
// val model_sql = sqlContext.sql("SELECT * FROM d_model")
// channel_sql.map(t => "d_model: " + t(0)).collect().foreach(println)
//三张表关联
val join_d = sqlContext.sql("SELECT '2015M02',log_device_appu.packag FROM log_device_appu " +
"inner join d_app_list on log_device_appu.packag=d_app_list.packag").saveAsParquetFile(args(3))
//四张表关联,只有是spark 1.2的版本才能跑
// val join_d = sqlContext.sql("SELECT '2015M02', log_device_appu.packag,d_channel.static_channel,d_model.static_model," +
// "count(distinct(log_device_appu.client_id)) " +
// "FROM log_device_appu " +
// "inner join d_app_list on log_device_appu.packag=d_app_list.packag " +
// "inner join d_channel on log_device_appu.chl=d_channel.channel " +
// "inner join d_model on d_channel.channel=d_model.channel and d_model.model=log_device_appu.mod " +
// "where log_device_appu.ext1>='20150201' and log_device_appu.ext1<='20150228' " +
// "and log_device_appu.dur<(12*3600*1000) " +
// "group by log_device_appu.packag,d_channel.static_channel,d_model.static_model"
// ).collect().foreach(print)
/**
* Created by Administrator on 2015/3/31.
*/
import java.text.SimpleDateFormat
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
//import com.hadoop.mapreduce.LzoTextInputFormat
class SQLOnSpark{}
object SQLOnSpark{
case class Person(name:String,age:Int)
case class log_device_appu(client_id:String,chl:String,mod:String,region_id:String,packag:String,dur:String,client_time:String,ext1:String)
case class appstore_register(client_id:String,version:String,ip:String,region_id:String,isp_id:String,register_time:String,last_login_time:String,day:String)
case class app_list(id:String,packag:String,package_name:String,package_type:String,display_order:String,category:String,create_time:String)
case class d_channel(channel:String,name:String,typ:String,level:String,full_type:String,category:String,static_channel:String)
case class d_model(channel:String,model:String,alias:String,static_model:String)
def main (args: Array[String]) {
val sparkConf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(sparkConf)
// val conf = sc.hadoopConfiguration
// conf.set("io.compression.codecs","com.hadoop.compression.lzo.LzopCodec")
import org.apache.spark.sql.SQLContext
val sqlContext: SQLContext = new SQLContext(sc)
import sqlContext._
//log_device_appu
val gc=sc.textFile("appu").map(_.split("\t")).map(p=>log_device_appu(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7)))
// val gc=sc.textFile(args(0)).map(_.split("\t")).map(p=>appstore_register(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7)))
// val gc_lzo = sc.newAPIHadoopFile(args(0),classOf[com.hadoop.mapreduce.LzoTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text])
// val lzoRDD = gc_lzo.map(_.split("\t")).map(p=>appstore_register(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7)))
gc.registerAsTable("log_device_appu")
val gc_sql = sqlContext.sql("SELECT * FROM log_device_appu")
// gc_sql.map(t => "client_id: " + t(0)).collect().foreach(println)
gc_sql.map(t => "client_id: " + t(0)).coalesce(1,false).saveAsTextFile(args(1))
// d_app_list
val app=sc.textFile(args(2)).map(_.split("\t")).map(x=>app_list(x(0),x(1),x(2),x(3),x(4),x(5),x(6)))
app.registerAsTable("d_app_list")
val d_app_list = sqlContext.sql("SELECT * FROM d_app_list")
// d_app_list.map(t => "id: " + t(0)).collect().foreach(println)
// val join = sqlContext.sql("SELECT log_device_appu.client_id FROM log_device_appu join d_app_list where d_app_list.id=log_device_appu.client_id")
//d_channel
// val channel=sc.textFile("app_list").map(_.split("\t")).map(y=>d_channel(y(0),y(1),y(2),y(3),y(4),y(5),y(6)))
// channel.registerAsTable("d_channel")
// val channel_sql = sqlContext.sql("SELECT * FROM d_channel")
// channel_sql.map(t => "channel: " + t(0)).collect().foreach(println)
//d_model
// val model=sc.textFile("d_model").map(_.split("\t")).map(y=>d_model(y(0),y(1),y(2),y(3)))
// model.registerAsTable("d_model")
// val model_sql = sqlContext.sql("SELECT * FROM d_model")
// channel_sql.map(t => "d_model: " + t(0)).collect().foreach(println)
//三张表关联
val join_d = sqlContext.sql("SELECT '2015M02',log_device_appu.packag FROM log_device_appu " +
"inner join d_app_list on log_device_appu.packag=d_app_list.packag").saveAsParquetFile(args(3))
//四张表关联,只有是spark 1.2的版本才能跑
// val join_d = sqlContext.sql("SELECT '2015M02', log_device_appu.packag,d_channel.static_channel,d_model.static_model," +
// "count(distinct(log_device_appu.client_id)) " +
// "FROM log_device_appu " +
// "inner join d_app_list on log_device_appu.packag=d_app_list.packag " +
// "inner join d_channel on log_device_appu.chl=d_channel.channel " +
// "inner join d_model on d_channel.channel=d_model.channel and d_model.model=log_device_appu.mod " +
// "where log_device_appu.ext1>='20150201' and log_device_appu.ext1<='20150228' " +
// "and log_device_appu.dur<(12*3600*1000) " +
// "group by log_device_appu.packag,d_channel.static_channel,d_model.static_model"
// ).collect().foreach(print)
相关文章推荐
- Mysql几种索引类型的区别及适用情况
- oracle 查看表空间的脚本
- 数据库操作事务小结
- 删除mysql服务
- Oracle 数据库中 十六进制转换为number类型的函数
- mysql更改root密码
- Oracle一些基本术语英汉对照
- 批处理bat文件调用oracle sql文件并传入参数
- mysql学习(1)
- mysql数据库学习
- MySQL查询优化处理
- 启动mysql错误on Mac
- PHP中安装使用mongodb数据库
- Oracle视图可以进行DML操作的条件
- mysql中去除换行以及回车
- sqlite3 实际运用
- swoole 连接多台 redis示例
- Redis的VM实现——终究敌不过业务架构师!
- 高性能mysql之schema与数据类型优化
- 为IntelliJ IDEA安装MySQL驱动