# coding:utf-8
# 统计每小时、每天、每周、每月的在线人数
'''
spark-submit --master yarn --num-executors 3 --driver-memory 4g --executor-memory 4g --executor-cores 1 --py-files datetimeCalculate.py onlineCount.py
'''
from pyspark import StorageLevel
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
import datetime
import time
# 从csv中读文件内容到DataFrame中
def get_df_from_csv(hiveContext, file, schema):
df = hiveContext.read.csv(file, schema, sep='+').na.drop().select('account', 'datetime')
df = df.withColumn('month', dt_month_udf(df.datetime))\
.withColumn('week', dt_week_udf(df.datetime))\
.withColumn('day', dt_day_udf(df.datetime))\
.withColumn('hour',dt_hour_udf(df.datetime))
df = df.selectExpr('account', 'int(month)', 'int(week)', 'int(day)', 'int(hour)')
df = df.distinct().sort(['month', 'week', 'day', 'hour'])
return df
# 按天统计每小时的在线人数
def get_day_hour_count_df(df):
df0 = df.groupBy(['schoolId', 'month', 'week', 'day', 'hour']).agg({'account' : 'count'}).withColumnRenamed('count(account)', 'count')
df1 = df0.sort(['schoolId', 'month', 'week', 'day', 'hour'])
return df1
def get_date_range(df, date_type):
df1 = df.select(date_type).distinct()
df2 = df1.selectExpr('min(%s) as min' % date_type, 'max(%s) as max' % date_type)
rows = df2.collect()
min = 0
max = 0
range = []
if len(rows) == 1:
min, max = rows[0]['min'], rows[0]['max']
range.append(min)
range.append(max)
return range
def get_week_time(dtstr):
dt = time.strptime(dtstr, "%Y %W %w %H")
return time.strftime('%Y-%m-%d', dt)
def write_hive_day_hour(hiveContext, year, df):
table_name = 'day_hour_online'
hiveContext.sql("create table if not exists user.%s (hour int, count int) partitioned by (date string, schoolId int)" % table_name)
schoolIds = df.select("schoolId").distinct().rdd.map(lambda x: x['schoolId']).collect()
for schoolId in schoolIds:
school_df = df.filter(df.schoolId == schoolId)
for month in range(min_month, max_month + 1):
month_day_count = get_month_days("%d-%d" %(year, month))
month_df = school_df.filter(school_df.month == month)
for day in range(1, month_day_count + 1):
day_df = month_df.filter(month_df.day == day)
lines = []
for hour in range(24):
hour_df = day_df.filter(day_df.hour == hour)
hour_rows = hour_df.collect()
hour_count = 0
if len(hour_rows) == 1:
hour_count = hour_rows[0]['count']
lines.append({'hour':hour, 'count':hour_count})
timestr = "%04.0f-%02.0f-%02.0f" % (year, month, day)
print("day hour date: %s" % timestr)
hiveContext.createDataFrame(lines, "hour:int, count:int").createOrReplaceTempView('online_hour_table')
hiveContext.sql("insert overwrite table user.%s partition(date='%s', schoolId='%d') select * from online_hour_table" % (table_name, timestr, schoolId))
# 主函数入口
if __name__ != "__main__":
print("not main function, exit ...")
exit()
sc = SparkContext(appName="StatisOnlineCount")
year = 2018
min_month = 4
max_month = 5
start_dt = datetime.datetime.now()
hiveContext = HiveContext(sc)
hiveContext.sql("use user")
# 使用udf自定义日期字符串处理函数
dt_month_udf = F.udf(get_month_from_dtstr)
dt_week_udf = F.udf(get_week_from_dtstr)
dt_day_udf = F.udf(get_day_from_dtstr)
dt_hour_udf = F.udf(get_hour_from_dtstr)
hdfshost = "hdfs://192.168.1.10:8020"
login_schema = StructType([StructField("datetime", StringType(), True),
StructField("did", StringType(), True),
StructField("device", StringType(), True),
StructField("app", StringType(), True),
StructField("account", StringType(), True),
StructField("braseip", StringType(), True)])
schema = StructType([StructField("account", StringType(), True),
StructField("month", IntegerType(), True),
StructField("week", IntegerType(), True),
StructField("day", IntegerType(), True),
StructField("hour", IntegerType(), True)])
user_df = hiveContext.read.csv(path=hdfshost + "/data/event20180508.txt", header=True, sep='\t').selectExpr("mobile as account", "int(userId)")
login_year_df = hiveContext.createDataFrame([], schema)
for month in range(min_month, max_month + 1):
dir_path = hdfshost + "/data/event/%d-%02.0f/" % (year, month)
login_path = dir_path + "login-*.txt"
scan_path = dir_path + "scan-*.txt"
login_month_df = get_df_from_csv(hiveContext, login_path, login_schema)
scan_month_df = get_df_from_csv(hiveContext, scan_path, login_schema)
login_month_df1 = login_month_df.union(scan_month_df).distinct()
login_year_df = login_year_df.union(login_month_df1).distinct()
# 按小时、天统计在线人数
login_year_df1 = login_year_df.join(user_df, "account")
login_day_hour_count_df = get_day_hour_count_df(login_year_df1)
login_day_hour_count_df.persist(StorageLevel.MEMORY_AND_DISK_SER)
write_hive_day_hour(hiveContext, year, login_day_hour_count_df)
login_day_hour_count_df.unpersist()
minutes = (datetime.datetime.now() - start_dt).total_seconds() / 60.0
print('this job takes %.2f minutes' % minutes)
sc.stop()