没有合适的资源?快使用搜索试试~ 我知道了~
1. spark sql 是用于操作结构化数据的程序包 2. Dataset :是一个分布式的数据集合 4. 在 spark 2.0 之后, SQLContex
资源详情
资源评论
资源推荐
2022/4/27 03_dataframe
huaxiaozhuan.com/工具/spark/chapters/03_dataframe.html 1/38
Spark SQL
一、概述
1.
spark sql
是用于操作结构化数据的程序包
通过 spark sql
,可以使用 SQL
或者
HQL
来查询数据,查询结果以 Dataset/DataFrame
的形式返回
它支持多种数据源,如 Hive
表、 Parquet
以及
JSON
等
它支持开发者将 SQL
和传统的 RDD
变成相结合
2.
Dataset :是一个分布式的数据集合
它是 Spark 1.6
中被添加的新接口
它提供了 RDD 的优点与 Spark SQL
执行引擎的优点
它在 Scala
和 Java
中是可用的。 Python
不支持 Dataset API 。但是由于 Python
的动态特性,许
多 DataSet API
的优点已经可用
3.
DataFrame :是一个 Dataset
组成的指定列。
它的概念等价于一个关系型数据库中的表
在 Scala/Python
中, DataFrame
由 DataSet
中的
RowS
(
多个 Row
)
来表示。
4.
在 spark 2.0
之后, SQLContext
被
SparkSession
取代。
二、
SparkSession
1.
spark sql
中所有功能的入口点是 SparkSession
类。它可以用于创建 DataFrame 、注册 DataFrame 为
table 、在 table
上执行 SQL 、缓存 table 、读写文件等等。
2.
要创建一个 SparkSession ,仅仅使用 SparkSession.builder
即可:
3.
Builder
用于创建 SparkSession ,它的方法有(这些方法都返回 self
):
.appName(name) :给程序设定一个名字,用于在 Spark web UI
中展示。如果未指定,则 spark
会随
机生成一个。
name :一个字符串,表示程序的名字
.config(key=None,value=None,conf=None) :配置程序。这里设定的配置会直接传递给 SparkConf
和
SparkSession 各自的配置。
key :一个字符串,表示配置名
value :对应配置的值
conf :一个 SparkConf
实例
有两种设置方式:
from pyspark.sql import SparkSession
spark_session = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
2022/4/27 03_dataframe
huaxiaozhuan.com/工具/spark/chapters/03_dataframe.html 2/38
通过键值对设置:
通过已有的 SparkConf
设置:
.enableHiveSupport() :开启 Hive
支持。( spark 2.0
的新接口)
.master(master) :设置 spark master URL 。如:
master=local :表示单机本地运行
master=local[4] :表示单机本地
4
核运行
master=spark://master:7077 :表示在一个 spark standalone cluster
上运行
.getOrCreate() :返回一个已有的 SparkSession
实例;如果没有则基于当前 builder
的配置,创建
一个新的 SparkSession
实例
该方法首先检测是否有一个有效的全局默认 SparkSession
实例。如果有,则返回它;如果没有,
则创建一个作为全局默认 SparkSession 实例,并返回它
如果已有一个有效的全局默认 SparkSession
实例,则当前 builder 的配置将应用到该实例上
2.1
属性
1.
.builder = <pyspark.sql.session.Builder object at 0x7f51f134a110> :一个 Builder 实例
2.
.catalog :一个接口。用户通过它来 create、drop、alter、query 底层的数据库、 table
以及
function
等
可以通过 SparkSession.catalog.cacheTable('tableName') ,
来缓存表;通过
SparkSession.catalog.uncacheTable('tableName')
来从缓存中删除该表。
3.
.conf : spark
的运行时配置接口。通过它,你可以获取、设置 spark、hadoop
的配置。
4.
.read :返回一个 DataFrameReader ,用于从外部存储系统中读取数据并返回 DataFrame
5.
.readStream :返回一个 DataStreamReader ,用于将输入数据流视作一个 DataFrame
来读取
6.
.sparkContext :返回底层的 SparkContext
7.
.streams :返回一个 StreamingQueryManager 对象,它管理当前上下文的所有活动的 StreamingQuery
8.
.udf :返回一个 UDFRegistration ,用于 UDF
注册
9.
.version :返回当前应用的 spark
版本
2.2
方法
1.
.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True) :从 RDD
、一个列
表、或者 pandas.DataFrame
中创建一个 DataFrame
参数:
data :输入数据。可以为一个 RDD 、一个列表、或者一个 pandas.DataFrame
schema :给出了 DataFrame
的结构化信息。可以为:
SparkSession.builder.config("spark.some.config.option", "some-value")
SparkSession.builder.config(conf=SparkConf())
2022/4/27 03_dataframe
huaxiaozhuan.com/工具/spark/chapters/03_dataframe.html 3/38
一个字符串的列表:给出了列名信息。此时每一列数据的类型从 data
中推断
为 None :此时要求 data
是一个 RDD ,且元素类型为 Row、namedtuple、dict
之一。此时
结构化信息从 data
中推断(推断列名、列类型)
为 pyspqrk.sql.types.StructType :此时直接指定了每一列数据的类型。
为 pyspark.sql.types.DataType
或者 datatype string :此时直接指定了一列数据的类
型,会自动封装成 pyspqrk.sql.types.StructType (只有一列)。此时要求指定的类型与
data
匹配(否则抛出异常)
samplingRatio :如果需要推断数据类型,则它指定了需要多少比例的行记录来执行推断。如果
为 None ,则只使用第一行来推断。
verifySchema :如果为 True ,则根据 schema
检验每一行数据
返回值:一个 DataFrame 实例
2.
.newSession() :返回一个新的 SparkSession 实例,它拥有独立的 SQLConf 、 registered temporary
views and UDFs ,但是共享同样的 SparkContext 以及 table cache 。
3.
.range(start,end=None,step=1,numPartitions=None) :创建一个 DataFrame ,它只有一列。该列的列名
为 id ,类型为 pyspark.sql.types.LongType ,数值为区间 [start,end) ,间隔为 step (即:
list(range(start,end,step))
)
4.
.sql(sqlQuery) :查询 SQL
并以 DataFrame
的形式返回查询结果
5.
.stop() :停止底层的 SparkContext
6.
.table(tableName) :以 DataFrame 的形式返回指定的 table
三、
DataFrame
创建
1.
在一个 SparkSession
中,应用程序可以从一个已经存在的 RDD 、 HIVE 表、或者 spark 数据源中创建一个
DataFrame
3.1
从列表创建
1.
未指定列名:
结果为:
2.
指定列名:
结果为:
l = [('Alice', 1)]
spark_session.createDataFrame(l).collect()
[Row(_1=u'Alice', _2=1)] #自动分配列名
l = [('Alice', 1)]
spark_session.createDataFrame(l, ['name', 'age']).collect()
2022/4/27 03_dataframe
huaxiaozhuan.com/工具/spark/chapters/03_dataframe.html 4/38
3.
通过字典指定列名:
结果为:
3.2
从
RDD
创建
1.
未指定列名:
结果为:
2.
指定列名:
结果为:
3.
通过 Row
来创建:
结果为:
[Row(name=u'Alice', age=1)]
d = [{'name': 'Alice', 'age': 1}]
spark_session.createDataFrame(d).collect()
[Row(age=1, name=u'Alice')]
rdd = sc.parallelize([('Alice', 1)])
spark_session.createDataFrame(rdd).collect()
[Row(_1=u'Alice', _2=1)] #自动分配列名
rdd = sc.parallelize([('Alice', 1)])
spark_session.createDataFrame(rdd, ['name', 'age']).collect()
[Row(name=u'Alice', age=1)]
from pyspark.sql import Row
Person = Row('name', 'age')
rdd = sc.parallelize([('Alice', 1)]).map(lambda r: Person(*r))
spark_session.createDataFrame(rdd, ['name', 'age']).collect()
[Row(name=u'Alice', age=1)]
2022/4/27 03_dataframe
huaxiaozhuan.com/工具/spark/chapters/03_dataframe.html 5/38
4.
指定 schema :
结果为:
5.
通过字符串指定 schema :
结果为:
如果只有一列,则字符串 schema
为:
结果为:
3.3
从
pandas.DataFrame
创建
1.
使用方式:
结果为:
3.4
从数据源创建
from pyspark.sql.types import *
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)])
rdd = sc.parallelize([('Alice', 1)])
spark_session.createDataFrame(rdd, schema).collect()
[Row(name=u'Alice', age=1)]
rdd = sc.parallelize([('Alice', 1)])
spark_session.createDataFrame(rdd, "a: string, b: int").collect()
[Row(name=u'Alice', age=1)]
rdd = sc.parallelize([1])
spark_session.createDataFrame(rdd, "int").collect()
[Row(value=1)]
df = pd.DataFrame({'a':[1,3,5],'b':[2,4,6]})
spark_session.createDataFrame(df).collect()
[Row(a=1, b=2), Row(a=3, b=4), Row(a=5, b=6)]
剩余37页未读,继续阅读
十二.12
- 粉丝: 35
- 资源: 276
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
评论0