您的位置:首页 > 数据库

SparkSqlForTest

2015-12-02 07:37 330 查看
package week4

/**

 * Created by Administrator on 2015/3/31.

 */

import java.text.SimpleDateFormat

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.SparkContext._

import org.apache.spark.sql._

//import com.hadoop.mapreduce.LzoTextInputFormat

class SQLOnSpark{}

object SQLOnSpark{

  case class Person(name:String,age:Int)

  case class log_device_appu(client_id:String,chl:String,mod:String,region_id:String,packag:String,dur:String,client_time:String,ext1:String)

  case class appstore_register(client_id:String,version:String,ip:String,region_id:String,isp_id:String,register_time:String,last_login_time:String,day:String)

  case class app_list(id:String,packag:String,package_name:String,package_type:String,display_order:String,category:String,create_time:String)

  case class d_channel(channel:String,name:String,typ:String,level:String,full_type:String,category:String,static_channel:String)

  case class d_model(channel:String,model:String,alias:String,static_model:String)

  def main (args: Array[String]) {

      val sparkConf = new SparkConf().setAppName("test").setMaster("local")

      val sc = new SparkContext(sparkConf)

//    val conf = sc.hadoopConfiguration

//    conf.set("io.compression.codecs","com.hadoop.compression.lzo.LzopCodec")

      import org.apache.spark.sql.SQLContext

      val sqlContext: SQLContext = new SQLContext(sc)

      import sqlContext._

      //log_device_appu

    val gc=sc.textFile("appu").map(_.split("\t")).map(p=>log_device_appu(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7)))

//      val gc=sc.textFile(args(0)).map(_.split("\t")).map(p=>appstore_register(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7)))

//    val gc_lzo = sc.newAPIHadoopFile(args(0),classOf[com.hadoop.mapreduce.LzoTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text])

//    val lzoRDD = gc_lzo.map(_.split("\t")).map(p=>appstore_register(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7)))

      gc.registerAsTable("log_device_appu")

      val gc_sql = sqlContext.sql("SELECT * FROM log_device_appu")

//      gc_sql.map(t => "client_id: " + t(0)).collect().foreach(println)

    gc_sql.map(t => "client_id: " + t(0)).coalesce(1,false).saveAsTextFile(args(1))

//    d_app_list

      val app=sc.textFile(args(2)).map(_.split("\t")).map(x=>app_list(x(0),x(1),x(2),x(3),x(4),x(5),x(6)))

      app.registerAsTable("d_app_list")

      val d_app_list = sqlContext.sql("SELECT * FROM d_app_list")

//      d_app_list.map(t => "id: " + t(0)).collect().foreach(println)

//      val join =  sqlContext.sql("SELECT log_device_appu.client_id FROM log_device_appu join d_app_list where d_app_list.id=log_device_appu.client_id")

    //d_channel

//    val channel=sc.textFile("app_list").map(_.split("\t")).map(y=>d_channel(y(0),y(1),y(2),y(3),y(4),y(5),y(6)))

//    channel.registerAsTable("d_channel")

//    val channel_sql = sqlContext.sql("SELECT * FROM d_channel")

//    channel_sql.map(t => "channel: " + t(0)).collect().foreach(println)

    //d_model

//    val model=sc.textFile("d_model").map(_.split("\t")).map(y=>d_model(y(0),y(1),y(2),y(3)))

//    model.registerAsTable("d_model")

//    val model_sql = sqlContext.sql("SELECT * FROM d_model")

//    channel_sql.map(t => "d_model: " + t(0)).collect().foreach(println)

 //三张表关联 

    val join_d =  sqlContext.sql("SELECT '2015M02',log_device_appu.packag FROM log_device_appu " +

      "inner join d_app_list on log_device_appu.packag=d_app_list.packag").saveAsParquetFile(args(3))

//四张表关联,只有是spark 1.2的版本才能跑

//        val join_d =  sqlContext.sql("SELECT '2015M02', log_device_appu.packag,d_channel.static_channel,d_model.static_model," +

//          "count(distinct(log_device_appu.client_id)) " +

//          "FROM log_device_appu " +

//          "inner join d_app_list on log_device_appu.packag=d_app_list.packag " +

//          "inner join d_channel on log_device_appu.chl=d_channel.channel " +

//          "inner join d_model on d_channel.channel=d_model.channel and d_model.model=log_device_appu.mod " +

//          "where log_device_appu.ext1>='20150201' and log_device_appu.ext1<='20150228' " +

//          "and log_device_appu.dur<(12*3600*1000) " +

//          "group by log_device_appu.packag,d_channel.static_channel,d_model.static_model"

//          ).collect().foreach(print)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: