#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = 'panda-coding'
import datetime
import sys
from pyspark.sql import SparkSession
if sys.getdefaultencoding() != 'utf-8':
reload(sys)
sys.setdefaultencoding('utf-8')
if len(sys.argv) > 1:
dt_str = sys.argv[1]
else:
yes_date = datetime.date.today() + datetime.timedelta(-1)
dt_str = str(yes_date)
print("--------------dt=%s" % dt_str)
# 数据重复性校验
def check(spark):
sql = """
select dept_name,
sku_id,
sku_name,
num
from (
select '事业部A' as dept_name,'1111' as sku_id,'SKU1' as sku_name,16 as num
union all
select '事业部B' as dept_name,'2222' as sku_id,'SKU2' as sku_name,18 as num
) M1
"""
print("------------sql=%s" % sql)
dataRows = spark.sql(sql).collect()
html = "<h2>不同事业部SKU明细:</h2>"
html += "<table border='1' style='border-collapse: collapse; border-spacing: 0; '>"
html += "<tr><th >事业部名称</th><th >商品ID</th><th >商品名称</th><th >数量</th></tr>"
for row in dataRows:
html += "<tr style='text-align: center;color:red'><td >" + str(row['dept_name']) + "</td><td >" + str(row['sku_id']) + "</td><td >" + str(row['sku_name']) + "</td><td >" + str(row['num']) + "</td> </tr>"
html += "</table>"
return html
def sendMail(spark):
import smtplib
from email.mime.text import MIMEText
from email.header import Header
from email.mime.multipart import MIMEMultipart
conf = {
"email_recipients": ["123@qq.com","234@qq.com"],
"email_from": "公共邮箱",
"email_title": "【请关注】邮件发送-title",
"email_service_host": "smtp.test.local",
"email_service_port": 111,
"email_service_user": "customer_service",
"email_service_password": "password",
"email_sender": "customer_service@qq.com"
}
recipients = conf["email_recipients"]
html = check(spark)
html += "<br/><br/>"
message = MIMEMultipart()
message['From'] = Header(conf["email_from"], 'utf-8')
message['To'] = Header(";".join(recipients), 'utf-8')
message['Subject'] = Header(title, 'utf-8')
message.attach(MIMEText(html, 'html', 'utf-8'))
smtp = smtplib.SMTP()
smtp.connect(conf["email_service_host"], conf["email_service_port"])
smtp.login(conf["email_service_user"], conf["email_service_password"])
smtp.sendmail(conf["email_sender"], recipients, message.as_string())
smtp.quit()
if __name__ == "__main__":
spark = SparkSession.builder.appName("address_dc").enableHiveSupport().getOrCreate()
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict;")
sendMail(spark)