目录
模型设计
加工宽表
任务调度:
大表 - 把很多数据整合起来
方便后续的明细查询和指标计算
模型设计
设计 建模
设计: excel 文档去编写
建模: 使用建模工具 PowerDesigner Navicat 在线画图工具... 把表结构给绘
制出来
共享\项目课工具\pd
加工宽表
数据层 DWS 层
dws_lijinquan.dws_xbd_mxm_memberinfo_dim_t
dws_shihaihong.dws_xbd_mxm_memberinfo_dim_t:
Python:
#!/bin/python3
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime
import os
import re
# 宽表加工
# pyspark + spark sql
# 宽表加工
# pyspark + spark sql
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
# 会员检测临时表
def do_member_tmp_check():
sql = '''
SELECT
member_id AS member, -- 会员卡号
MIN(CASE WHEN detect_time = max_detect_time THEN erp_code END)
AS rec_detect_store, -- 最近检测门店
max(detect_time) AS rec_detect_date, -- 最近检测时间
count(1) AS check_count, -- 累计检测次数
min(substr(detect_time,1,10)) AS filing_date, -- 建档时间
min(CASE WHEN detect_time = min_detect_time THEN erp_code END)
AS store_name, -- 建档门店名称
max(extend) AS is_anamnesis, -- 有无既往病史
CASE WHEN COUNT(bec_chr_mbr_date) > 0 THEN 1 ELSE 0 END AS
is_chr_mbr -- 是否特慢病会员
FROM (
SELECT
*,
MIN(detect_time) OVER (PARTITION BY member_id) AS
min_detect_time,
MAX(detect_time) OVER (PARTITION BY member_id) AS
max_detect_time
FROMchange_shihaihong.his_chronic_patient_info_new
)
GROUP BY member_id
'''
df = spark.sql(sql)
df.show()
# 保存到 hive: change_shihaihong.member_check
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/change/member_check")
os.system("""
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists change_shihaihong location
"/zhiyun/shihaihong/change";
'
""")
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/change/member_check")
\
.saveAsTable("change_shihaihong.member_check")
print("写入 hive 表成功")
# 会员订单情况临时表
def do_member_tmp_sale():
@F.udf()
def handle_pay_fav_type(val1,val2,val3):
payments = {
"银行卡": val1,
"手机支付": val2,
"现金": val3
}
payment_tuples = list(payments.items())
payment_tuples.sort(key=lambda x: (-x[1], x[0])) #使用
负值来确保从大到小排序
result_strings = [method for method, _ in payment_tuples]
# 使用>符号连接字符串result = '>'.join(result_strings)
return result
sql='''
select
m.member, -- 会员卡号
count(1) as order_total, -- 总订单数
round(sum(m.precash),2) as order_amount, -- 消费总额
max(m.starttime) as last_order_date, -- 最后一单日期
min(m.starttime) as first_order_date, -- 首单日期
count(case when m.starttime >= date_sub('2018-01-01',
30) then 1 end) as order_30, -- 30 天订单量
count(case when m.starttime >= date_sub('2018-01-01',
90) then 1 end) as order_90, -- 90 天订单量
sum(case when m.starttime >= date_sub('2018-01-01', 30)
then round(m.precash,2) else 0 end) as amount_30, -- 30 天消费金
额
sum(case when m.starttime >= date_sub('2018-01-01', 90)
then round(m.precash,2) else 0 end) as amount_90, -- 90 天消费金
额
sum(case when m.starttime >= date_sub('2018-01-01',
180) then round(m.precash,2) else 0 end) as amount_180, -- 180
天消费金额
count(case when eusp.paytype = '银行卡 ' then 1 end) as
bank_count,
count(case when eusp.paytype = '手机支付' then 1 end)
as credit_card_count,
count(case when eusp.paytype = '现金 ' then 1 end) as
cash_count,
'' as pay_fav_type
from change_shihaihong.erp_u_sale_m_inc_new m
left join change_shihaihong.erp_u_sale_pay_inc_new eusp
on m.saleno = eusp.saleno -- 确保连接条件是正确的
group by m.member
'''
df = spark.sql(sql)
df = df.withColumn("pay_fav_type",
handle_pay_fav_type("bank_count","credit_card_count","cash_cou
nt"))
df.drop("bank_count")
df.drop("credit_card_count")
df.drop("cash_count")
df.show()# 保存到 Hive: change_shihaihong.member_sale
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat", "parquet") \
.option("location",
"/zhiyun/shihaihong/change/member_sale") \
.saveAsTable("change_shihaihong.member_sale")
print("临时表保存成功")
print("写入 hive 表成功")
# 宽表加工
def do_member_table():
# 主表可以列最多的那个表
# 会员的订单情况可以用子查询统计出来, 也可以使用临时表
sql = '''
with t as (select * from
change_shihaihong.crm_user_base_info_his_new )
select
t.user_id as mbr_code, -- 会员编码
t.user_type as mbr_type, -- 会员类型
t.source as mbr_resource, -- 会员来源
m.memcardno as mbr_cardno, -- 会员卡号
t.erp_code as store_code, -- 注册门店编码
t.active_time as sto_reg_date, -- 门店注册日期
"" as reg_platform, -- 注册外部平台
"" as platform_reg_date, -- 外部平台注册时间
t.name as name, -- 姓名
t.sex as gender, -- 性别
t.birthday as birthdate, -- 出生日期
t.age as age, -- 年龄
t.id_card_no as mbr_id_card, -- 身份证号
t.social_insurance_no as social_security_no, -- 社保卡号
t.education as edu_background, -- 教育背景
t.job as profession, -- 职业
"未知" as is_marriage, -- 婚姻状况
"无" as have_children, -- 是否有孩
t.address as address, -- 通信地址
"" as region, -- 区域
m.province as province, -- 省
m.city as city, -- 城市t.last_subscribe_time as cancel_date, -- 注销时间
m.tel as phone, -- 联系电话
m.handset as cell_phone, -- 手机号
t.email as email, -- 邮箱
t.wechat as wechat, -- 微信账号
t.webo as weibo, -- 微博账号
"" as alipay, -- 支付宝账号
"" as app, -- APP 账号
sale.order_total as order_total, -- 总订单数
sale.order_amount as order_amount, -- 消费总额
sale.last_order_date as last_order_date, -- 最后一单日期
sale.first_order_date as first_order_date, -- 首单日期
sale.order_30 as order_30, -- 30 天订单量
sale.order_90 as order_90, -- 90 天订单量
sale.amount_30 as amount_30, -- 30 天消费金额
sale.amount_90 as amount_90, -- 90 天消费金额
sale.amount_180 as amount_180, -- 180 天消费金额
sale.pay_fav_type as pay_fav_type, -- 付款方式偏爱排行
g.groupname as group_name, -- 会员分组
"" as ware_buy_sort, -- 药品购买排行
m.ness as sickness_motion, -- 疾病关注
check.rec_detect_store as rec_detect_store, -- 最近检测门
店
check.rec_detect_date as rec_detect_date, -- 最近检测时间
check.check_count as check_count, -- 累计检测次数
check.filing_date as filing_date, -- 建档时间
check.store_name as store_name, -- 建档门店名称
check.is_anamnesis as is_anamnesis, -- 有无既往病史
check.is_chr_mbr as is_chr_mbr, -- 是否特慢病会员
current_timestamp as etl_time, -- ETL 时间
"ETL by qinyuxiao" as comments -- 备注信息
from t
left join change_shihaihong.erp_u_memcard_reg_full_new m on
m.scrm_userid = t.user_id
left join change_shihaihong.member_sale sale on sale.member =
m.memcardno
left join change_shihaihong.member_check check on
check.member = m.memcardno
left join dwd_qinyuxiao.erp_c_memcard_class_group g on
sale.order_amount >=g.lg and sale.order_amount < g.gt
'''
df = spark.sql(sql)df.show()
# 保存
# 保存到 hive: dws_xbd_mxm_memberinfo_dim_t
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/dws/dws_xbd_mxm_memberinfo_dim_t")
os.system("""
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists dws_shihaihong location
"/zhiyun/shihaihong/dws";
'
""")
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/dws/dws_xbd_mxm_memb
erinfo_dim_t"). \
saveAsTable("dws_shihaihong.dws_xbd_mxm_memberinfo_dim_t")
print("写入 hive 表成功")
# 验证数据
# 注意总数据量应该跟 CRM 表一致 168W 整 多一条都不行
# 计算原表的总记录数
original_count_sql = "select count(1) from
change_shihaihong.crm_user_base_info_his_new"
original_count =
spark.sql(original_count_sql).collect()[0][0]
print(f"原表总记录数: {original_count}")
# 计算新表的总记录数
new_count_sql = "select count(1) from
dws_shihaihong.dws_xbd_mxm_memberinfo_dim_t"
new_count = spark.sql(new_count_sql).collect()[0][0]
print(f"新表总记录数: {new_count}")
do_member_tmp_check()
do_member_tmp_sale()
do_member_table()
print("宽表加工完成")# 部署