import os
from pyspark import SparkContext, SparkConf
# 锁定远端环境配置
from spark_core.fns.Fnn import *
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('RDD算子,即RDD函数、RDD-API的运用')
conf = SparkConf().setAppName(value='rdd-experience').setMaster('local')
sc = SparkContext(conf=conf)
# #######################################分区函数###################################
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 3)
# 1.foreachPartition
# rdd.foreachPartition(fn2)
# 2.mapPartitions
# rdd = rdd.mapPartitions(fn1).glom().collect()
# print(rdd)
# 3.增大分区
# rdd = rdd.repartition(5).glom().collect()
# print(rdd)
# 4.减少分区
# rdd = rdd.repartition(2).glom().collect()
# print(rdd)
# 5. coalesce改变RDD得分区数量, 得到一个新的RDD,
# 默认情况下,无法增大分区,无Shuffle,但是可以减少分区
# 参数1: 分区数
# 参数2:表示是否存在shuffle, 默认为false, 但是只能减少分区,为True可以增大分区
# rdd = rdd.coalesce(5).glom().collect()
# print(rdd)
# repartition本质上是coalesce的一种当参数2为True的简写方案,
# 因为repartition底层调用coalesce函数,将其参数2设置为True
# partitionBy: 专门针对kv类型重分区的函数
# 默认: 根据key进行Hash取模划分操作 ,也可自定义分区规则
rm = sc.parallelize(
[
(1, 'c01'), (2, 'c02'), (3, 'c03'),
(4, 'c04'), (5, 'c05'), (6, 'c06'),
(7, 'c07'), (8, 'c08'), (9, 'c09'),
(10, 'c10')], 3)
# rdd = rm.partitionBy(5).glom().collect()
# print(rdd)
# 自定义分区规则
# rm1 = rm.partitionBy(2, lambda key: 0 if key <= 5 else 1).glom().collect()
# print(rm1)
# #######################################聚合函数###################################
'''
reduce(fn): 根据传入的函数对数据进行聚合计算
fold(defaultAgg,fn): 根据传入的函数对数据进行聚合计算, 同时支持给agg设置初始值
aggregate(defaultAgg, fn1, fn2):
根据传入的函数对数据进行聚合计算, 参数1 设置agg的初始值,
fn1对RDD的各个分区内的数据进行聚合操作,
fn2负责将各个分区的聚合结果进行汇总处理
'''
# rdd = rdd.reduce(lambda agg, curr: agg + curr)
# rdd = rdd.fold(10, lambda agg, curr: agg + curr)
# rdd = rdd.aggregate(10, fn3, fn4)
# print(rdd)
# #######################################KV类型聚合函数###################################
rkd = sc.parallelize(
[
('c01', 1), ('c02', 2), ('c03', 3),
('c04', 4), ('c05', 5), ('c06', 6),
('c07', 7), ('c08', 8), ('c09', 9),
('c01', 10)], 3)
# rkd0 = rkd.glom().collect()
# rkd1 = rkd.reduceByKey(lambda agg, curr: agg + curr).collect()
# rkd2 = rkd.foldByKey(10, lambda agg, curr: agg + curr).collect()
# rkd3 = rkd.aggregateByKey(10, fn3, fn4).collect()
# rkd4 = rkd.groupByKey().mapValues(list).collect()
#
# print(rkd0)
# print(rkd1)
# print(rkd2)
# print(rkd3)
# print(rkd4)
# #######################################关联函数###################################
rdd1 = sc.parallelize([('c01', '张三'), ('c02', '李四'), ('c01', '王五'), ('c01', '赵六'), ('c02', '田七'), ('c03', '周八')])
rdd2 = sc.parallelize([('c01', '大数据1期'), ('c02', '大数据2期'), ('c04', '大数据4期')])
r1 = rdd1.join(rdd2).groupByKey().mapValues(list).collect()
r2 = rdd1.leftOuterJoin(rdd2).collect()
r3 = rdd2.rightOuterJoin(rdd1).collect()
r4 = rdd1.fullOuterJoin(rdd2).collect()
print(r1)
print(r2)
print(r3)
print(r4)
sc.stop()
没有合适的资源?快使用搜索试试~ 我知道了~
Spark - Resilient Distributed Datasets (RDDs)介绍
共2个文件
py:2个
需积分: 5 0 下载量 49 浏览量
2023-06-24
18:30:46
上传
评论
收藏 2KB ZIP 举报
温馨提示
RDD分区调整、聚合函数、关联函数的算子运用
资源推荐
资源详情
资源评论
收起资源包目录
spark_core.zip (2个子文件)
spark_core
RDDExperience.py 4KB
fns
Fnn.py 252B
共 2 条
- 1
资源评论
时下握今
- 粉丝: 18
- 资源: 8
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功