学习大数据DAY61 宽表加工

目录

模型设计

加工宽表

任务调度:


大表 - 把很多数据整合起来
方便后续的明细查询和指标计算

模型设计

设计 建模
设计: excel 文档去编写
建模: 使用建模工具 PowerDesigner Navicat 在线画图工具... 把表结构给绘
制出来
共享\项目课工具\pd

加工宽表

数据层 DWS 层
dws_lijinquan.dws_xbd_mxm_memberinfo_dim_t
dws_shihaihong.dws_xbd_mxm_memberinfo_dim_t:
Python:
#!/bin/python3
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime
import os
import re
# 宽表加工
# pyspark + spark sql
# 宽表加工
# pyspark + spark sql
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
# 会员检测临时表
def do_member_tmp_check():
sql = '''
SELECT
member_id AS member, -- 会员卡号
MIN(CASE WHEN detect_time = max_detect_time THEN erp_code END)
AS rec_detect_store, -- 最近检测门店
max(detect_time) AS rec_detect_date, -- 最近检测时间
count(1) AS check_count, -- 累计检测次数
min(substr(detect_time,1,10)) AS filing_date, -- 建档时间
min(CASE WHEN detect_time = min_detect_time THEN erp_code END)
AS store_name, -- 建档门店名称
max(extend) AS is_anamnesis, -- 有无既往病史
CASE WHEN COUNT(bec_chr_mbr_date) > 0 THEN 1 ELSE 0 END AS
is_chr_mbr -- 是否特慢病会员
FROM (
SELECT
*,
MIN(detect_time) OVER (PARTITION BY member_id) AS
min_detect_time,
MAX(detect_time) OVER (PARTITION BY member_id) AS
max_detect_time
FROMchange_shihaihong.his_chronic_patient_info_new
)
GROUP BY member_id
'''
df = spark.sql(sql)
df.show()
# 保存到 hive: change_shihaihong.member_check
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/change/member_check")
os.system("""
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists change_shihaihong location
"/zhiyun/shihaihong/change";
'
""")
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/change/member_check")
\
.saveAsTable("change_shihaihong.member_check")
print("写入 hive 表成功")
# 会员订单情况临时表
def do_member_tmp_sale():
@F.udf()
def handle_pay_fav_type(val1,val2,val3):
payments = {
"银行卡": val1,
"手机支付": val2,
"现金": val3
}
payment_tuples = list(payments.items())
payment_tuples.sort(key=lambda x: (-x[1], x[0])) #使用
负值来确保从大到小排序
result_strings = [method for method, _ in payment_tuples]
# 使用>符号连接字符串result = '>'.join(result_strings)
return result
sql='''
select
m.member, -- 会员卡号
count(1) as order_total, -- 总订单数
round(sum(m.precash),2) as order_amount, -- 消费总额
max(m.starttime) as last_order_date, -- 最后一单日期
min(m.starttime) as first_order_date, -- 首单日期
count(case when m.starttime >= date_sub('2018-01-01',
30) then 1 end) as order_30, -- 30 天订单量
count(case when m.starttime >= date_sub('2018-01-01',
90) then 1 end) as order_90, -- 90 天订单量
sum(case when m.starttime >= date_sub('2018-01-01', 30)
then round(m.precash,2) else 0 end) as amount_30, -- 30 天消费金
额
sum(case when m.starttime >= date_sub('2018-01-01', 90)
then round(m.precash,2) else 0 end) as amount_90, -- 90 天消费金
额
sum(case when m.starttime >= date_sub('2018-01-01',
180) then round(m.precash,2) else 0 end) as amount_180, -- 180
天消费金额
count(case when eusp.paytype = '银行卡 ' then 1 end) as
bank_count,
count(case when eusp.paytype = '手机支付' then 1 end)
as credit_card_count,
count(case when eusp.paytype = '现金 ' then 1 end) as
cash_count,
'' as pay_fav_type
from change_shihaihong.erp_u_sale_m_inc_new m
left join change_shihaihong.erp_u_sale_pay_inc_new eusp
on m.saleno = eusp.saleno -- 确保连接条件是正确的
group by m.member
'''
df = spark.sql(sql)
df = df.withColumn("pay_fav_type",
handle_pay_fav_type("bank_count","credit_card_count","cash_cou
nt"))
df.drop("bank_count")
df.drop("credit_card_count")
df.drop("cash_count")
df.show()# 保存到 Hive: change_shihaihong.member_sale
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat", "parquet") \
.option("location",
"/zhiyun/shihaihong/change/member_sale") \
.saveAsTable("change_shihaihong.member_sale")
print("临时表保存成功")
print("写入 hive 表成功")
# 宽表加工
def do_member_table():
# 主表可以列最多的那个表
# 会员的订单情况可以用子查询统计出来, 也可以使用临时表
sql = '''
with t as (select * from
change_shihaihong.crm_user_base_info_his_new )
select
t.user_id as mbr_code, -- 会员编码
t.user_type as mbr_type, -- 会员类型
t.source as mbr_resource, -- 会员来源
m.memcardno as mbr_cardno, -- 会员卡号
t.erp_code as store_code, -- 注册门店编码
t.active_time as sto_reg_date, -- 门店注册日期
"" as reg_platform, -- 注册外部平台
"" as platform_reg_date, -- 外部平台注册时间
t.name as name, -- 姓名
t.sex as gender, -- 性别
t.birthday as birthdate, -- 出生日期
t.age as age, -- 年龄
t.id_card_no as mbr_id_card, -- 身份证号
t.social_insurance_no as social_security_no, -- 社保卡号
t.education as edu_background, -- 教育背景
t.job as profession, -- 职业
"未知" as is_marriage, -- 婚姻状况
"无" as have_children, -- 是否有孩
t.address as address, -- 通信地址
"" as region, -- 区域
m.province as province, -- 省
m.city as city, -- 城市t.last_subscribe_time as cancel_date, -- 注销时间
m.tel as phone, -- 联系电话
m.handset as cell_phone, -- 手机号
t.email as email, -- 邮箱
t.wechat as wechat, -- 微信账号
t.webo as weibo, -- 微博账号
"" as alipay, -- 支付宝账号
"" as app, -- APP 账号
sale.order_total as order_total, -- 总订单数
sale.order_amount as order_amount, -- 消费总额
sale.last_order_date as last_order_date, -- 最后一单日期
sale.first_order_date as first_order_date, -- 首单日期
sale.order_30 as order_30, -- 30 天订单量
sale.order_90 as order_90, -- 90 天订单量
sale.amount_30 as amount_30, -- 30 天消费金额
sale.amount_90 as amount_90, -- 90 天消费金额
sale.amount_180 as amount_180, -- 180 天消费金额
sale.pay_fav_type as pay_fav_type, -- 付款方式偏爱排行
g.groupname as group_name, -- 会员分组
"" as ware_buy_sort, -- 药品购买排行
m.ness as sickness_motion, -- 疾病关注
check.rec_detect_store as rec_detect_store, -- 最近检测门
店
check.rec_detect_date as rec_detect_date, -- 最近检测时间
check.check_count as check_count, -- 累计检测次数
check.filing_date as filing_date, -- 建档时间
check.store_name as store_name, -- 建档门店名称
check.is_anamnesis as is_anamnesis, -- 有无既往病史
check.is_chr_mbr as is_chr_mbr, -- 是否特慢病会员
current_timestamp as etl_time, -- ETL 时间
"ETL by qinyuxiao" as comments -- 备注信息
from t
left join change_shihaihong.erp_u_memcard_reg_full_new m on
m.scrm_userid = t.user_id
left join change_shihaihong.member_sale sale on sale.member =
m.memcardno
left join change_shihaihong.member_check check on
check.member = m.memcardno
left join dwd_qinyuxiao.erp_c_memcard_class_group g on
sale.order_amount >=g.lg and sale.order_amount < g.gt
'''
df = spark.sql(sql)df.show()
# 保存
# 保存到 hive: dws_xbd_mxm_memberinfo_dim_t
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/dws/dws_xbd_mxm_memberinfo_dim_t")
os.system("""
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists dws_shihaihong location
"/zhiyun/shihaihong/dws";
'
""")
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/dws/dws_xbd_mxm_memb
erinfo_dim_t"). \
saveAsTable("dws_shihaihong.dws_xbd_mxm_memberinfo_dim_t")
print("写入 hive 表成功")
# 验证数据
# 注意总数据量应该跟 CRM 表一致 168W 整 多一条都不行
# 计算原表的总记录数
original_count_sql = "select count(1) from
change_shihaihong.crm_user_base_info_his_new"
original_count =
spark.sql(original_count_sql).collect()[0][0]
print(f"原表总记录数: {original_count}")
# 计算新表的总记录数
new_count_sql = "select count(1) from
dws_shihaihong.dws_xbd_mxm_memberinfo_dim_t"
new_count = spark.sql(new_count_sql).collect()[0][0]
print(f"新表总记录数: {new_count}")
do_member_tmp_check()
do_member_tmp_sale()
do_member_table()
print("宽表加工完成")# 部署

任务调度:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/18486.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

DBeaver MACOS 安装 并连接到docker安装的mysql

官网下载&#xff1a;Download | DBeaver Community 网盘下载&#xff1a;链接: https://pan.baidu.com/s/15fAhbflHO-AGc-uAnc3Rjw?pwdbrz9 提取码: brz9 下载驱动 连接测试 报错 null, message from server: "Host 172.17.0.1 is not allowed to connect to this M…

24首届数证杯(流量分析部分)

目录 流量分析 流量分析 1、分析网络流量包检材&#xff0c;写出抓取该流量包时所花费的秒数?(填写数字&#xff0c;答案格式:10) 3504相加即可 2、分析网络流量包检材&#xff0c;抓取该流量包时使用计算机操作系统的build版本是多少? 23F793、分析网络流量包检材&#x…

云服务器ECS经济型e实例和通用算力u1实例有啥区别?

阿里云服务器ECS经济型e实例怎么样&#xff1f;对比ECS通用算力型u1实例哪个更好&#xff1f;u1实例更好。阿里云服务器网aliyunfuwuqi.com二者均为云服务器ECS的实例规格&#xff0c;e实例是共享型云服务器&#xff0c;u1实例是独享型云服务器&#xff0c;何为共享&#xff1f…

QT中使用图表之QChart绘制柱状图

绘制条形&#xff08;柱状&#xff09;图&#xff0c;系列选择条形系列QBarSeries x轴选择条形图的种类轴QBarCategoryAxis 1、创建图表视图 //1、创建图表视图 QChartView * view new QChartView(this); //开启抗锯齿 view -> setRenderHint(QPainter::Antialiasing); …

Essential Cell Biology--Fifth Edition--Chapter one (8)

1.1.4.6 The Cytoskeleton [细胞骨架] Is Responsible for Directed Cell Movements 细胞质基液不仅仅是一种无结构的化学物质和细胞器的混合物[soup]。在电子显微镜下&#xff0c;我们可以看到真核细胞的细胞质基液是由长而细的丝交叉而成的。通常[Frequently]&#xff0c;可…

【Linux】守护进程

目录 进程组 会话 作业控制 实现守护进程 我们在写完一些网络服务后&#xff0c;如果想让这个服务一直在云服务器的后台运行着&#xff0c;那该如何实现呢&#xff1f;其实就用到了这篇博客要讲的守护进程 进程组 我们首先需要了解进程组的概念&#xff0c;其实sleep 1000这…

nginx.conf配置文件中的命令

打开我们的conf文件 nginx.conf文件中&#xff0c;分为3大块&#xff1a; 全局块&#xff0c;就是events和http块之外的内容。设置nginx服务器整体运行的指令 格式为&#xff1a; 指令名 指令值 events块&#xff0c;用于配置与用户的网络连接的内容&#xff0c;对nginx的…

51单片机基础07 实时时钟-思路及代码参考1

目录 一、实现功能 二、思路1的分析 1、定时器0 2、外部中断0 3、主函数main 4、其他重要功能函数 一、实现功能 1、实现最基本的计时功能&#xff0c;显示时、分、秒&#xff0c;可以通过按键设置时间。 要求&#xff1a;时钟计时精确&#xff0c;按键操作不影响计时。…

vTESTstudio系列15--vTESTstudio-Doors的需求和测试用例的管理

最近有朋友在咨询vTESTstudio中怎么去跟Doors里面的需求去做好管理这方面的问题&#xff0c;临时加两篇文章介绍一下,Lets Go!!! 目录 1.Doors的配置&#xff1a; 1.1 安装Doors AddIn for vTESTstudio&#xff1a; 1.2 更新XML脚本&#xff1a; 1.3 导出需求的Trace Item…

基于Java Springboot编程语言在线学习平台

一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术&#xff1a;Html、Css、Js、Vue、Element-ui 数据库&#xff1a;MySQL 后端技术&#xff1a;Java、Spring Boot、MyBatis 三、运行环境 开发工具&#xff1a;IDEA/eclipse 数据…

JDK安装报错“以下应用程序正在使用需要由此安装程序更新的文件”

&#xff08;一&#xff09;问题描述 我刚刚没有截图&#xff0c;这是我在网上看到的图&#xff1a; &#xff08;二&#xff09;可能的解决办法 1. 下方工具栏右键&#xff0c;打开任务管理器按钮&#xff0c;在进程中找到“Java Platform SE binary” 进程&#xff0c;右键结…

数据库第3次作业

学生表&#xff1a;Student (Sno, Sname, Ssex , Sage, Sdept) 学号&#xff0c;姓名&#xff0c;性别&#xff0c;年龄&#xff0c;所在系 Sno为主键 课程表&#xff1a;Course (Cno, Cname,) 课程号&#xff0c;课程名 Cno为主键 学生选课表&#xff1a;SC (Sno, Cno, Score)…

Linux之文件系统,软硬连接和动静态库

Linux之文件系统&#xff0c;软硬连接和动静态库 一.文件系统1.1磁盘的存储结构1.2CHS和LBA1.3ext2文件系统 二.软硬连接2.1软链接2.2硬链接 三.静态库和动态库3.1静态库与动态库的概念3.2静态库的创建与使用3.3动态库的创建与使用3.4动态库的加载 一.文件系统 在上篇的学习中…

【项目开发】URL中井号(#)的技术细节

未经许可,不得转载。 文章目录 前言一、# 的基本含义二、# 不参与 HTTP 请求三、# 后的字符处理机制四、# 的变化不会触发网页重新加载五、# 的变化会记录在浏览器历史中六、通过 window.location.hash 操作七、onhashchange 事件八、Google 对 # 的处理机制前言 2023 年 9 月…

TikZ 绘图学习笔记

这篇笔记的所有代码如下&#xff1a; % !TEX TS-program pdflatex % !TEX encoding UTF-8 Unicode% This is a simple template for a LaTeX document using the "article" class. % See "book", "report", "letter" for other typ…

Android Framework层介绍

文章目录 前言一、Android Framework 层概述二、主要组件1. 应用程序接口&#xff08;API&#xff09;2. 系统服务3. Binder4. 资源管理5. Content Provider6. 广播接收器&#xff08;BroadcastReceiver&#xff09;7. 服务&#xff08;Service&#xff09; 三、与 Linux Kerne…

如何选择等保服务

在当今信息化高速发展的时代&#xff0c;企业信息系统已成为业务运营的核心支撑&#xff0c;其安全性直接关系到企业的生存与发展。为了应对日益复杂的网络安全威胁&#xff0c;国家推行了等级保护&#xff08;简称等保&#xff09;制度&#xff0c;作为一项基本的信息安全保障…

MCU中的定时器

第一章 定时器的应用场景 第二章 定时器的原理 2.1 定时器的计数原理 1. 定时器的本质是一个计数器&#xff1b; 2. 计数器是对输入的系统频率信号进行计数&#xff1b; 3. 每来一个周期的信号&#xff0c;计数器的cnt 加一。如果周期T表示为1s&#xff0c;来三个周期就表示…

主页任务与计算器任务

一、主页任务 /* Private includes -----------------------------------------------------------*/ //includes #include "user_TasksInit.h" #include "user_ScrRenewTask.h" #include "main.h" #include "rtc.h" #include "…

javascript 入门-01-变量声明

因缘际会 Alice: 编程入门好像很难吧,我能学会吗 ?我虽然是计算机专业的,但是我几乎没怎么写过代码。但是你先别说我菜,我身边的同学大家都是这样的 🤷 Bob: 那你能写冒泡排序或者求数组最大值吗 ? Alice: 冒泡排序写不出来,求数组最大值还能试试看。不过为什么问这个…