from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from datetime import datetime, timedelta
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, LongType
import sys
# 创建SparkSession
spark = SparkSession.builder \
.appName("Database Query") \
.enableHiveSupport() \
.getOrCreate()
# 数据库连接信息
DB_HOST = 'mysql的ip'
DB_PORT = 3306 # MySQL默认端口号
DB_NAME = 'mysql的库名'
DB_USER = 'mysql的用户名'
DB_PASSWORD = 'mysql的密码'
# 检查的日期和月份
#DATE_TO_CHECK = '2024-05-22'
#MONTH_TO_CHECK = '202403'
# 计算昨天的日期
yesterday = datetime.now() - timedelta(1)
DATE_TO_CHECK = yesterday.strftime('%Y-%m-%d')
DATE_TO_CHECK_YMD = yesterday.strftime('%Y%m%d')
# 计算上一个月的月份
last_month = datetime.now().replace(day=1) - timedelta(days=1)
MONTH_TO_CHECK = last_month.strftime('%Y%m')
# 表名和相应的SQL查询语句
TABLE_QUERIES = {
'工作流名称': "SELECT COUNT(1) FROM 表名 WHERE create_date = '{}'".format(DATE_TO_CHECK)
}
# 表名和相应的SQL查询语句(Hive)
HIVE_TABLE_QUERIES = {
'工作流名称': "SELECT COUNT(1) FROM hive库名.hive表名 WHERE ds = '{}'".format(DATE_TO_CHECK_YMD)
}
def execute_mysql_query(query):
try:
# 从数据库加载数据
df = spark.read.format("jdbc") \
.option("url", f"jdbc:mysql://{DB_HOST}:{DB_PORT}/{DB_NAME}") \
.option("dbtable", f"({query}) as temp") \
.option("user", DB_USER) \
.option("password", DB_PASSWORD) \
.option("driver", "com.mysql.jdbc.Driver") \
.load()
# 获取查询结果
result = df.select(col("count(1)").alias("count")).collect()[0]["count"]
return result
except Exception as e:
print("Error executing query:", e)
if hasattr(e, 'java_exception'):
java_exception = e.java_exception
print("Java exception details:", java_exception)
# 输出 Java 异常的错误消息和堆栈信息
print("Java exception message:", java_exception.getMessage())
print("Java exception stack trace:", java_exception.getStackTrace())
return None
def execute_hive_query(query):
try:
# 执行Hive查询
result = spark.sql(query).collect()[0][0]
return result
except Exception as e:
print("Error executing Hive query:", e)
return None
def save_to_mysql(script_names, db_types, run_statuses):
url = "jdbc:mysql://" + DB_HOST + ":" + str(DB_PORT) + "/" + DB_NAME
# 准备插入或更新的SQL语句
insert_sql = """
INSERT INTO bigdata_autocheck_days_run_data_result (script_name, db_type, run_status, day, update_time)
VALUES (%s, %s, %s, %s, NOW())
ON DUPLICATE KEY UPDATE run_status = %s, update_time = NOW();
"""
# 执行INSERT...ON DUPLICATE KEY UPDATE语句
#编写sql,只更新,不插入,根据script_name为主键来更新
update_sql = """
UPDATE bigdata_autocheck_days_run_data_result
SET run_status = %s, day = %s, update_time = NOW()
WHERE script_name = %s;
"""
# 创建包含数据的 Row 对象
data = []
for script_name, db_type, run_status in zip(script_names, db_types, run_statuses):
data.append(Row(run_status=run_status,db_type=db_type, day=DATE_TO_CHECK, script_name=script_name ))
# 定义DataFrame的模式,确保包含所有列
schema = StructType([
StructField("script_name", StringType(), True),
StructField("db_type", StringType(), True),
StructField("run_status", StringType(), True),
StructField("day", StringType(), True)
])
# 使用模式创建DataFrame
df = spark.createDataFrame(data, schema)
# 创建包含数据的 Row 对象
# data = Row("script_name", "db_type", "run_status", "day", "update_time")
# df = spark.createDataFrame([data(script_name, db_type, run_status, DATE_TO_CHECK, DATE_TO_CHECK)])
df.write.format("jdbc") \
.mode("overwrite") \
.option("truncate", "true") \
.option("url", url) \
.option("dbtable", "bigdata_autocheck_days_run_data_result") \
.option("user", DB_USER) \
.option("password", DB_PASSWORD) \
.option("driver", "com.mysql.jdbc.Driver") \
.option("numPartitions", 1) \
.save()
def main():
# 标记是否有失败的任务
task_failed = False
# 收集所有表的检查结果
results = []
# 遍历每个表并检查数据量
for table, query in TABLE_QUERIES.items():
result = execute_mysql_query(query)
results.append((table, "mysql", "success" if result is not None and result != 0 else "failed"))
if result is None:
task_failed = False
print("Failed to execute query for table:", table)
elif result == 0:
task_failed = True
print("No data found for table:", table)
else:
print("Data found for table:", table)
# 检查Hive中的表
for table, query in HIVE_TABLE_QUERIES.items():
result = execute_hive_query(query)
results.append((table, "hive", "success" if result is not None and result != 0 else "failed"))
if result is None:
task_failed = False
print("Failed to execute Hive query for table:", table)
elif result == 0:
task_failed = True
print("No data found in Hive for table:", table)
else:
print("Data found in Hive for table:", table)
# 将结果写入MySQL
save_to_mysql(*zip(*results))
# 如果有任何任务失败,退出并返回非零退出码
if task_failed:
sys.exit(1)
if __name__ == "__main__":
main()
没有合适的资源?快使用搜索试试~ 我知道了~
海豚调度器自动监测每日报表及自动重跑异常工作流综合实例(亲测可用)
共2个文件
py:2个
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
0 下载量 195 浏览量
2024-06-13
13:43:20
上传
评论
收藏 5KB ZIP 举报
温馨提示
介绍了海豚调度器如何检测报表是否跑出数据,如果获取工作流信息和启动工作流。通过这3个步骤,可以做1个自动检测海豚调度器每日报表,并根据工作流信息来自动重新跑数据异常的工作流。 autocheck_days_run_data_result.py是检测每日报表是否跑出数据, autocheck_days_run_supple_data.py是对没有跑出数的工作流进行重跑。 详细教程可参考:https://blog.csdn.net/linweidong/article/details/139650752
资源推荐
资源详情
资源评论
收起资源包目录
海豚调度器自动监测每日报表及自动重跑异常工作流代码.zip (2个子文件)
海豚调度器自动监测每日报表及自动重跑异常工作流代码
autocheck_days_run_supple_data.py 5KB
autocheck_days_run_data_result.py 6KB
共 2 条
- 1
资源评论
大模型大数据攻城狮
- 粉丝: 5015
- 资源: 62
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功