### 5.3 选择、筛选 与 聚合
from pyspark import SparkConf
from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *
conf = SparkConf().setAppName("PySpark 的数据读写").setMaster('spark://192.168.126.10:7077')
sc = SparkContext.getOrCreate(conf)
spark = SparkSession(sc)
# ----------------- 一、HDFS的数据读取-------------------
ratings = spark.read.csv('hdfs://192.168.126.10:9000/data/ratings.csv', header=True) # 数据集包含标题行
ratings.describe().show() # 查看数据集的基本数据信息
ratings.printSchema() # 输出数据集的结构
ratings.show(5)
# ******** 1.转换数据类型 ********
ratings = ratings.withColumn('rating', ratings.rating.cast('double')) # 转换rating的数据类型为double双精度型,重新命名为ratings列
ratings = ratings.withColumn("date",
from_unixtime(ratings.timestamp.cast("bigint"), 'yyyy-MM-dd')) # 将timestamp转为日期格式,新增一列date
ratings.show(5)
ratings = ratings.withColumn("date", ratings.date.cast("date")) # 将 date列转换为日期类型date,重新命名为date列
ratings = ratings.drop("timestamp") # 删除timestamp列
ratings.printSchema() # 查看DataFrame的结构
ratings.show(5)
# ********* 2.读取数据时自动识别数据类型 ************
# 数据集包含标题行, inferSchema=True自动识别数据类型
movies = spark.read.csv('hdfs://192.168.126.10:9000/data/movies.csv', header=True, inferSchema=True)
movies.printSchema()
movies.show(5)
ratings_movies = ratings.join(movies, ratings.movieId == movies.movieId, "inner") \
.select("userId", ratings.movieId, 'title', 'date', 'rating')
ratings_movies.show(5)
# ******* 3.使用udf函数实现复杂的数据处理逻辑 *********
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf
# udf(用户自定义函数,User-Defined Function)是一个非常强大的工具,它允许你将Python函数应用到DataFrame的列上。
# 通过使用udf,实现更复杂的数据处理逻辑。
def isLike(v): # 定义普通的python函数,判断rating列,如果评分大于4,赋值True表示喜欢这部电影,否则复制False
if v > 4:
return True
else:
return False
udf_isLike = udf(isLike, BooleanType()) # 将创建的python函数注册为udf函数,类型为布尔型
ratings_movies = ratings_movies.withColumn("isLike", udf_isLike(
ratings_movies["rating"])) # 将rating列应用udf函数,将其分为True/False,结果存到新isLike列
ratings_movies.show(5)
# ******* 4.聚合函数 *************
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf("string", PandasUDFType.GROUPED_AGG) # 定义一个Pandas UDF,用于在分组聚合时将字符串列表合并为一个以逗号分隔的字符串
def fmerge(v):
return ','.join(v)
tags = spark.read.csv('hdfs://192.168.126.10:9000/data/tags.csv', header=True) # 数据集包含标题行
tags = tags.drop("timestamp") # 删除列timestamp
print("删除timestamp后的tags表:")
tags.show(5)
# ********groupBy 聚合********
tags_merge = tags.groupBy(["userId", "movieId"]).agg(fmerge(tags["tag"])) # 同一个用户对同一个电影的tag放在一起,以逗号隔开
print("聚合后:")
tags_merge.show(5, False)
tags_merge = tags_merge.withColumnRenamed("fmerge(tag)", "tags") # 将聚合的列:fmerge(tag), 改为:tags
print("列改为tags")
tags_merge.show(5)
final_join = ratings_movies.join(tags_merge, (ratings_movies.movieId == tags_merge.movieId) & (
ratings_movies.userId == tags_merge.userId)) \
.select(ratings_movies.userId, ratings_movies.movieId, 'title', 'date', 'tags', 'rating', 'isLike') \
.orderBy(['date'], ascending=True) # 选择
final_join.show(5)
final_join = final_join.filter(ratings.date > '2015-10-25') # 筛选,日期大于2015-10-25
final_join.printSchema() # 显示表的结构
final_join.show(5)
# ----------------- 二、存储数据:存放到HDFS系统中------------
# 使用了.coalesce(1)来减少分区数到1,这有助于避免生成多个小文件
final_join.coalesce(1).write.format('csv') \
.option('header', 'true') \
.mode('overwrite') \
.save('hdfs://192.168.126.10:9000/output/movie-out-csv.csv') # csv格式
final_join.coalesce(1).write.format('json') \
.mode('overwrite') \
.save('hdfs://192.168.126.10:9000/output/movie-out-json.json') # json格式
# ---------------- 三、存储数据:存放到Linux的MySQL中-----------------------
# 定义MySQL数据库的连接参数
url = "jdbc:mysql://192.168.126.10:3306/test"
properties = {
"user": "root",
"password": "123456",
"driver": "com.mysql.jdbc.Driver"
}
# 写数据到MySQL的test数据库的movie表,无需预先在MySQL创建表。自动在mysql创建movie表
final_join.coalesce(1).write.format('jdbc') \
.option('url', url) \
.option('dbtable', 'movie') \
.option('user', properties['user']) \
.option('password', properties['password']) \
.option('driver', properties['driver']) \
.option('mode', 'append') \
# .save() # 第一次建表时使用.save()。append为追加数据模式,overwrite覆盖数据模式
print('Spark将数据写入mysql完成。')
# ------------------- 五、 读取MySQL数据---------------------
# 读取MySQL的test数据库的movie表
print('Spark读取mysql数据库的数据\n')
movie_read = spark.read \
.format("jdbc") \
.option("url", url) \
.option("dbtable", "movie") \
.option("user", properties["user"]) \
.option("password", properties["password"]) \
.option("driver", properties["driver"]) \
.load()
movie_read.show(5)
# 如果用mysql客户端查询bit类型数据,可以采用bin查看:select bin(isLike) from movie limit 10;
# ------------------- 四、写数据到MySQL的函数封装-----------------------
# 封装将pyspark的dataframe写入mysql数据库的函数
from pyspark.sql import DataFrame
import pymysql
def write_to_mysql(df: DataFrame, url: str, properties: dict, table_name: str):
"""
将DataFrame写入MySQL数据库的指定表。检查表是否存在,存在则先删除表
参数:
- df: 要写入的DataFrame。
- url: MySQL数据库的JDBC URL。
- properties: 包含数据库连接信息的字典(user, password, driver)。
- table_name: 要写入的MySQL表名。
- mode: 写入模式('append'、'overwrite'等)。
"""
conn = pymysql.connect(host=url.split('/')[2].split(':')[0],
port=int(url.split('/')[2].split(':')[1].split('/')[0]),
user=properties['user'],
password=properties['password'],
database=url.split('/')[-1],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
try:
with conn.cursor() as cursor:
# 检查表是否存在
cursor.execute(f"SHOW TABLES LIKE '{table_name}'")
if cursor.fetchone():
# 如果表存在,则删除它
cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
conn.commit()
print(f"删除表:{table_name}")
finally:
conn.close()
# 写数据到MySQL
df.coalesce(1).write.format('jdbc') \
.option('url', url) \
.option('dbtable', table_name) \
.option('user', properties['user']) \
.option('password', properties['password']) \
.option('driver', properties['driver']) \
.option('mode', 'append') \
.save()
print(f'Spark将数据写入MySQL的{table_name}表完成。')
#
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
PYSPARK读写.zip (4个子文件)
PYSPARK读写
movies.csv 483KB
PySparkETL_MySQL和HDFS读写.py 8KB
tags.csv 116KB
ratings.csv 2.23MB
共 4 条
- 1
资源评论
侧耳倾听童话
- 粉丝: 169
- 资源: 14
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- rpi4b基于uboot通过nfs挂载最新主线Linux内核的注意事项
- Cocos2d-x教程视频TMX地图解析
- Cocos2d-x教程视频CocosStudio 2.0 文件格式解析
- 基于 Van.js 的简单前端路由组件(支持字符串和正则表达式匹配等).zip
- Cocos2d-x教程视频CocosStudio 2.0 容器控件
- 学习资源-07~11,备份
- (源码)基于Flink和Kafka的实时用户行为日志分析系统.zip
- (源码)基于Arduino的机器人避障系统.zip
- Cocos2d-x教程视频Cocos2d-x游戏实战项目开发记忆卡片
- (源码)基于FreeRTOS和RP2040的实时操作系统应用模板.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功