Spark SQL 2.3.0:深入浅出

所需积分/C币:37 2018-08-20 17:27:59 160KB PDF
收藏 收藏 1
举报

Spark SQL 2.3.0:深入浅出,看了下,还行,希望对大家有帮助
val peopleDF spark read format(" json"). load("/data/people. json") //输出 Schema信 peopleD. printschema() //展示结果---默认展示20条 peop1eDF·show() //查询某一列 peopleDF. select(peopleDF( name")) /′过滤出大于20岁的人 peopleDF. filter( age >20"). show() 输出结果如下: //输出 Schema信息 root - age: long (nullable true) ame: string (nullable true) sex: string (nullable true) //展示结果---默认展示20条 age lname sex 18 张三 man 28|李四| female nu1王五|man //查询某一列 十 name 张三 李四 王五 /′过滤出大于20岁的人 age name I seX 28|李四| female Data Frame&RDD互操作 反射的方式 此种方式需要定义一个 object类,在 spark中可以直接使用 case class 来实现 编程代码 //定义 object case class people(id: Long, name: String, sex: String) val info spark sparkContext textFile( "/data/info. txt) 1此处一定要导入隐式转换,否则无法使用toDE()方法 import spark. implicits val infoDf infomap(.split( )).map(line = people (line(0).toLong, line(1), line (2))) tODF( infoR。show() 输出结果 十 d name Seⅹ 1 zhangsan man lisi female wangwu man 、编程方式 1.将一个基本的RDD转换为一个 ROW RDD; 2.定义 Schema信息; 3.调用 SparkSession createDataFrame方法创建 Data Frame; 编程代码 将RDD转换为 ROW RDD val info spark sparkContext textFile("/data/info.txt val infoRDD info map( , split(,")).map(line => Row(line(0).toLong 1ine(1);1ine(2))) 定义 strctrype val structType StructType (Array( StructFleld( id", LongType, true), StructField("name", StringType, true) StructField( sex", StringType, true) //调用方法创建 DataFrame val infoDF spark createDataFrame(infoRDD, structType 输出结果 infoDF. show() 输出结果 name seX 1 zhangsan man 23 lisi female wangwu man 三、反射&编程方式对比小结 1.如果已知数据结构,可以使用方式一反射的方式将rdd转换为 Dataframe,但是此种方式依赖于 case class,spak中关于样例类 的属性是有限制的,当字段过多的时候反射这种方式就无法使用。 2.在数据结构未知时,可以使用第二种方式,此种方式编程较第一种方 式稍过繁琐,但使用范围更广一些。 Dataframe dataset Dataset概述 Dataset是从spak16后提出的新接口,是一个分布式的数据集合,提供 RDD的优势以及 Spark sQl优化执行的特点。 DataFrame转换为 Dataset Data Frame直接调用as方法就可以转换为 Dataset。 编程代码 定义 case class case class people(name: String, age: Long, sex: String) val peopleD spark read format( ison"). load("/data/people. json") /导入隐式转换可以调用as方法将data£rame转换为 dataset import spark, implicits val peopleD peopleD as [people] peopleS. map(line = i line. name 3) show() 输出结果 value 张三| 李四 王五 External data source 操作 Parquet/Json/Csv文件 · Parquet/Json/Csν在 Spark sQL是内部数据源,因此在读取此类文 件时,我们只需修改 format0)类型即可完成对此类数据的加载处 理。 ·针对一些外部数据源,我们需要加载外部的工具类——外部数据 源,然后依然是修改我们的 format0类型即可。 编程代码 ′指定 format类型为 parquet/json/csv即可加载相应文件类型的数据本例以 parquet为例 val userDF spark read format(" parquet").load(" /data/users. parquet") //输出结果 userDF. show() 输出结果: + name favorite color favorite numbers Alyssa nu1[3,9,15,201 Ben red 操作Hive数据 前提:需要将hive的site配置文件拷贝到 spark的conf目录下,并在 Spark sQL工程中引入mysq工具类的依赖。 编程代码 /加载hive中的数据 vaⅠ hived= spark. table( haiveta⊥e) //可以直接调用 Dataframe的API也可以通过sqg1来执行 hiveD. select(∴ spark.sql( select 将数据写入到新的 table中 result write. saveAsTable(tableName 操作 MySQL数据 编程代码 //指定 froma类型为:jdbc //配置mysq1的ur1、用户名、密码、 database等信息 val mysq ldf spark read. format(jabc").options( Map( url"->jdbc mysql: //localhost: 3306/databank user root passwor datable databank task" driver"->com mysqljdbc Driver )·1oad() //查询数据 mysqlDF. select(" task name",camps"," platform"). show() 输出结果 task name camps platform 新建任务 416 mobile 新建任务 416 mobile 新建任务 416 mobile 440 440 mobile 操作外部数据源总结 读取数据范式: spark.read. format( format).load(path)。 ●保存数据范式:sparκ. write. format( format).save(path)使用时如 果是内部数据源则只需要修改相应的 format类型即可,如果是外部 数据源,则在 option中添加配置相应的jar工具类即可。 Spark SQL实战 项目需求 统计曝光最多的广告TOP八。No 按地市统计曝光最多的广告TOP 日志内容构成 日期,项目,媒体1D,广告位1D,平台类型,广告D,创意D,城 市code。 180730,474r1646r23147,1r1825,1927:1156370100 180730,474;1118,23251,1;1825,1927,1156370300 180730,474,2751229691;1825,1927,1156330100 180730,474164623172,1r1825,1927:1156330200 180730,474;1646,23156;11825,1927;1156510100 180730,474164623086,11826,1928:1156440300 180730,474,2083;23372,11825,1927r1156320400 180730,474:1646;23085,1;1825,1927,1156440100 180730,474r1646r23122,1r1825,1927,1156430400 180730,474,1646;23173,11825,1927,1156331000 备注:地域相关信息是通过解析j地址获得,可参考 database 需求实现 统计曝光最多的广告ToPN 由于已知日志的数据结构,所以本来采用csVv的方式读取日志数据 编程代码: //定义 schema信息 val structType StructType(Array( StructField("date", Stringfype, true), StructField("camp", StringType, true), StructFleld( "media",StringType, true) StructFleld("placement, stringType, true)r StructField( platform", StringType, true), StructField(" ad", StringType, true)r StructField( create", StringType, true) StructField( "city, StringType, true //以csv的方式加载数据 val viewLog spark, read schema(structrype) format(" CSV").load("/data/view/180730/) ′也可才有另外一种简略写法加载csv数据 val viewLog spark read schema(structType) csv("/data/view/180730/ /按照天、广告分组统计曝光最多的广告 viewlog groupBy( date","ad").agg(count("ad") as(times")). show() 输出结果: date ad times 十 1807301825179375 180730182628803 将结果保存到 MySQL中-示例代码如下: 调用 foreachpartition完成对结果数据的入库操作 viewAdTopNDF. foreachPartition(it = i val list = new ListBuffer[ DayvideoAccess] it foreach(row => t val day row getAs[String]("day) val adId row getAs[Long](adid") val times row getAS [Long]( times ") list append(dayvideoAccessstat(day, adId, times )) ′创建一个DAO类操作MySo实现对结果数据的保存操作 StatDao. insertDayvideoAccessTopN(list) 按地市统计曝光最多的广告ToPN 此处可以借助 DataFrame的窗口函数的使用完成。 编程代码 ′加载数据的操作已经在上一步完成,在此不再多余赘述 ′按照日期、城市、广告分组,然后在借助 window函数实现分城市topN val cityAdTopNDF viewLog groupBy( date","city,ad").agg(count("ad").as( times")). show() val top2 =cityAdTopNDF, select( cityAdtopNDF date") cityAdTopNDF( " city) cityAdtopNDF("ad") row number(). over(Window. partitionBy("city).orderBy(cityAdTopNDF(times") ).filter("times rank <=2"). show()

...展开详情
试读 12P Spark SQL 2.3.0:深入浅出
立即下载 低至0.43元/次 身份认证VIP会员低至7折
一个资源只可评论一次,评论内容不能少于5个字
dick1305 很不错,谢谢分享。
2019-03-06
回复
伍玖59 很好的资料,短小精悍~!内容很多干货
2019-03-03
回复
  • 签到新秀

    累计签到获取,不积跬步,无以至千里,继续坚持!
关注 私信 TA的资源
上传资源赚积分,得勋章
最新推荐
Spark SQL 2.3.0:深入浅出 37积分/C币 立即下载
1/12
Spark SQL 2.3.0:深入浅出第1页
Spark SQL 2.3.0:深入浅出第2页
Spark SQL 2.3.0:深入浅出第3页

试读结束, 可继续读1页

37积分/C币 立即下载 >