MyException - 我的异常网
当前位置:我的异常网» 数据仓库 » 基于Hadoop生态圈的数据仓库实践 —— 进阶技术(4)

基于Hadoop生态圈的数据仓库实践 —— 进阶技术(4)

www.MyException.Cn  网友分享于:2013-07-26  浏览:0次
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(四)
四、角色扮演维度
        当一个事实表多次引用一个维度表时会用到角色扮演维度。例如,一个销售订单有一个是订单日期,还有一个交货日期,这时就需要引用日期维度表两次。
        本节将说明两类角色扮演维度的实现,分别是表别名和数据库视图。这两种都使用了Hive的功能。表别名是在SQL语句里引用维度表多次,每次引用都赋予维度表一个别名。而数据库视图,则是按照事实表需要引用维度表的次数,建立相同数量的视图。
1. 修改数据库模式
        使用下面的脚本修改数据库模式。分别给数据仓库里的事实表sales_order_fact和源数据库中订单销售表sales_order增加request_delivery_date_sk和request_delivery_date列。
-- in hive
USE dw; 

-- sales_order_fact表是ORC格式,增加列需要重建数据
ALTER TABLE sales_order_fact RENAME TO sales_order_fact_old; 
CREATE TABLE sales_order_fact (  
    order_sk INT comment 'order surrogate key',      
    customer_sk INT comment 'customer surrogate key',      
    product_sk INT comment 'product surrogate key',      
    order_date_sk INT comment 'date surrogate key',
    request_delivery_date_sk INT comment 'request delivery date surrogate key',
    order_amount DECIMAL(10 , 2 ) comment 'order amount',  
    order_quantity INT COMMENT 'order_quantity'  
)  
CLUSTERED BY (order_sk) INTO 8 BUCKETS  
STORED AS ORC TBLPROPERTIES ('transactional'='true');  
INSERT INTO sales_order_fact 
SELECT order_sk, customer_sk, product_sk, order_date_sk, NULL, order_amount, order_quantity
  FROM sales_order_fact_old;  
DROP TABLE sales_order_fact_old;  

USE rds;  
ALTER TABLE sales_order ADD COLUMNS (request_delivery_date DATE COMMENT 'request delivery date') ;  

-- in mysql
USE source;  
ALTER TABLE sales_order ADD request_delivery_date DATE AFTER order_date ;
        修改后源数据库模式如下图所示。
        修改后DW数据库模式如下图所示。

        Hive不能像MySQL那样指定新增列的位置,它新增的列都是在表的最后。
2. 重建Sqoop作业
        使用下面的脚本重建Sqoop作业,增加request_delivery_date列。
last_value=`sqoop job --show myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop | grep incremental.last.value | awk '{print $3}'`
sqoop job --delete myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop
sqoop job \
--meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop \
--create myjob_incremental_import \
-- \
import \
--connect "jdbc:mysql://cdh1:3306/source?useSSL=false&user=root&password=mypassword" \
--table sales_order \
--columns "order_number, customer_number, product_code, order_date, entry_date, order_amount, order_quantity, request_delivery_date" \
--hive-import \
--hive-table rds.sales_order \
--incremental append \
--check-column order_number \
--last-value $last_value
        注意columns参数值中列的顺序(MySQL里的source.sales_order)要和rds.sales_order的顺序保持一致。
3. 修改定期装载regular_etl.sql文件
        定期装载HiveQL脚本需要增加对交货日期列的处理,修改后的脚本如下所示。
-- 设置变量以支持事务  
set hive.support.concurrency=true;  
set hive.exec.dynamic.partition.mode=nonstrict;  
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;  
set hive.compactor.initiator.on=true;  
set hive.compactor.worker.threads=1;  
  
USE dw;  
    
-- 设置SCD的生效时间和过期时间  
SET hivevar:cur_date = CURRENT_DATE();  
SET hivevar:pre_date = DATE_ADD(${hivevar:cur_date},-1);  
SET hivevar:max_date = CAST('2200-01-01' AS DATE);  
    
-- 设置CDC的上限时间  
INSERT OVERWRITE TABLE rds.cdc_time SELECT last_load, ${hivevar:cur_date} FROM rds.cdc_time;  
  
-- 装载customer维度  
-- 设置已删除记录和地址相关列上SCD2的过期,用<=>运算符处理NULL值。  
UPDATE customer_dim   
   SET expiry_date = ${hivevar:pre_date}    
 WHERE customer_dim.customer_sk IN    
(SELECT a.customer_sk   
   FROM (SELECT customer_sk,  
                customer_number,  
                customer_street_address,  
                customer_zip_code,  
                customer_city,  
                customer_state,  
                shipping_address,  
                shipping_zip_code,  
                shipping_city,  
                shipping_state  
           FROM customer_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN   
                rds.customer b ON a.customer_number = b.customer_number   
          WHERE b.customer_number IS NULL OR   
          (  !(a.customer_street_address <=> b.customer_street_address)  
          OR !(a.customer_zip_code <=> b.customer_zip_code)  
          OR !(a.customer_city <=> b.customer_city)  
          OR !(a.customer_state <=> b.customer_state)  
          OR !(a.shipping_address <=> b.shipping_address)  
          OR !(a.shipping_zip_code <=> b.shipping_zip_code)  
          OR !(a.shipping_city <=> b.shipping_city)  
          OR !(a.shipping_state <=> b.shipping_state)  
          ));   
  
-- 处理customer_street_addresses列上SCD2的新增行    
INSERT INTO customer_dim  
SELECT  
    ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,  
    t1.customer_number,  
    t1.customer_name,  
    t1.customer_street_address,  
    t1.customer_zip_code,  
    t1.customer_city,  
    t1.customer_state,  
    t1.shipping_address,  
    t1.shipping_zip_code,  
    t1.shipping_city,  
    t1.shipping_state,  
    t1.version,  
    t1.effective_date,  
    t1.expiry_date  
FROM    
(    
SELECT    
    t2.customer_number customer_number,  
    t2.customer_name customer_name,  
    t2.customer_street_address customer_street_address,  
    t2.customer_zip_code customer_zip_code,  
    t2.customer_city customer_city,  
    t2.customer_state customer_state,  
    t2.shipping_address shipping_address,  
    t2.shipping_zip_code shipping_zip_code,  
    t2.shipping_city shipping_city,  
    t2.shipping_state shipping_state,  
    t1.version + 1 version,  
    ${hivevar:pre_date} effective_date,    
    ${hivevar:max_date} expiry_date    
 FROM customer_dim t1   
INNER JOIN rds.customer t2    
   ON t1.customer_number = t2.customer_number     
  AND t1.expiry_date = ${hivevar:pre_date}    
 LEFT JOIN customer_dim t3   
   ON t1.customer_number = t3.customer_number   
  AND t3.expiry_date = ${hivevar:max_date}    
WHERE (!(t1.customer_street_address <=> t2.customer_street_address)  
   OR  !(t1.customer_zip_code <=> t2.customer_zip_code)  
   OR  !(t1.customer_city <=> t2.customer_city)  
   OR  !(t1.customer_state <=> t2.customer_state)  
   OR  !(t1.shipping_address <=> t2.shipping_address)  
   OR  !(t1.shipping_zip_code <=> t2.shipping_zip_code)  
   OR  !(t1.shipping_city <=> t2.shipping_city)  
   OR  !(t1.shipping_state <=> t2.shipping_state)  
   )  
  AND t3.customer_sk IS NULL) t1    
CROSS JOIN    
(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;  
  
-- 处理customer_name列上的SCD1  
-- 因为hive的update的set子句还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update  
-- 因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有customer_name改变的记录,而不是仅仅更新当前版本的记录  
DROP TABLE IF EXISTS tmp;  
CREATE TABLE tmp AS  
SELECT  
    a.customer_sk,  
    a.customer_number,  
    b.customer_name,  
    a.customer_street_address,  
    a.customer_zip_code,  
    a.customer_city,  
    a.customer_state,  
    a.shipping_address,  
    a.shipping_zip_code,  
    a.shipping_city,  
    a.shipping_state,  
    a.version,  
    a.effective_date,  
    a.expiry_date  
  FROM customer_dim a, rds.customer b    
 WHERE a.customer_number = b.customer_number AND !(a.customer_name <=> b.customer_name);    
DELETE FROM customer_dim WHERE customer_dim.customer_sk IN (SELECT customer_sk FROM tmp);    
INSERT INTO customer_dim SELECT * FROM tmp;  
  
-- 处理新增的customer记录   
INSERT INTO customer_dim  
SELECT  
    ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,  
    t1.customer_number,  
    t1.customer_name,  
    t1.customer_street_address,  
    t1.customer_zip_code,  
    t1.customer_city,  
    t1.customer_state,  
    t1.shipping_address,  
    t1.shipping_zip_code,  
    t1.shipping_city,  
    t1.shipping_state,  
    1,  
    ${hivevar:pre_date},  
    ${hivevar:max_date}  
FROM    
(    
SELECT t1.* FROM rds.customer t1 LEFT JOIN customer_dim t2 ON t1.customer_number = t2.customer_number    
 WHERE t2.customer_sk IS NULL) t1    
CROSS JOIN    
(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;  
  
-- 重载PA客户维度  
TRUNCATE TABLE pa_customer_dim;    
INSERT INTO pa_customer_dim    
SELECT    
  customer_sk    
, customer_number    
, customer_name    
, customer_street_address    
, customer_zip_code    
, customer_city    
, customer_state    
, shipping_address    
, shipping_zip_code    
, shipping_city    
, shipping_state    
, version    
, effective_date    
, expiry_date    
FROM customer_dim    
WHERE customer_state = 'PA' ;   
  
-- 装载product维度  
-- 设置已删除记录和product_name、product_category列上SCD2的过期  
UPDATE product_dim  
   SET expiry_date = ${hivevar:pre_date}    
 WHERE product_dim.product_sk IN    
(SELECT a.product_sk   
   FROM (SELECT product_sk,product_code,product_name,product_category   
           FROM product_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN   
                rds.product b ON a.product_code = b.product_code   
          WHERE b.product_code IS NULL OR (a.product_name <> b.product_name OR a.product_category <> b.product_category));  
  
-- 处理product_name、product_category列上SCD2的新增行    
INSERT INTO product_dim  
SELECT  
    ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,  
    t1.product_code,  
    t1.product_name,  
    t1.product_category,  
    t1.version,  
    t1.effective_date,  
    t1.expiry_date  
FROM    
(    
SELECT    
    t2.product_code product_code,  
    t2.product_name product_name,  
    t2.product_category product_category,      
    t1.version + 1 version,  
    ${hivevar:pre_date} effective_date,    
    ${hivevar:max_date} expiry_date    
 FROM product_dim t1   
INNER JOIN rds.product t2    
   ON t1.product_code = t2.product_code    
  AND t1.expiry_date = ${hivevar:pre_date}    
 LEFT JOIN product_dim t3   
   ON t1.product_code = t3.product_code   
  AND t3.expiry_date = ${hivevar:max_date}    
WHERE (t1.product_name <> t2.product_name OR t1.product_category <> t2.product_category) AND t3.product_sk IS NULL) t1    
CROSS JOIN    
(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;  
  
-- 处理新增的product记录  
INSERT INTO product_dim  
SELECT  
    ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,  
    t1.product_code,  
    t1.product_name,  
    t1.product_category,  
    1,  
    ${hivevar:pre_date},  
    ${hivevar:max_date}  
FROM    
(    
SELECT t1.* FROM rds.product t1 LEFT JOIN product_dim t2 ON t1.product_code = t2.product_code    
 WHERE t2.product_sk IS NULL) t1    
CROSS JOIN    
(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;  
  
-- 装载order维度  
INSERT INTO order_dim  
SELECT  
    ROW_NUMBER() OVER (ORDER BY t1.order_number) + t2.sk_max,  
    t1.order_number,  
    t1.version,  
    t1.effective_date,  
    t1.expiry_date  
  FROM  
(  
SELECT  
    order_number order_number,  
    1 version,  
    order_date effective_date,  
    '2200-01-01' expiry_date  
  FROM rds.sales_order, rds.cdc_time   
 WHERE entry_date >= last_load AND entry_date < current_load ) t1  
CROSS JOIN    
(SELECT COALESCE(MAX(order_sk),0) sk_max FROM order_dim) t2;  
  
-- 装载销售订单事实表  
INSERT INTO sales_order_fact  
SELECT  
    order_sk,  
    customer_sk,  
    product_sk,  
    e.date_sk,
    f.date_sk,
    order_amount,  
    order_quantity	
  FROM  
    rds.sales_order a,  
    order_dim b,  
    customer_dim c,  
    product_dim d,  
    date_dim e,
    date_dim f,
    rds.cdc_time g	
 WHERE  
    a.order_number = b.order_number  
AND a.customer_number = c.customer_number  
AND a.order_date >= c.effective_date  
AND a.order_date < c.expiry_date  
AND a.product_code = d.product_code  
AND a.order_date >= d.effective_date  
AND a.order_date < d.expiry_date  
AND to_date(a.order_date) = e.date 
AND to_date(a.request_delivery_date) = f.date 
AND a.entry_date >= g.last_load AND a.entry_date < g.current_load ;  
  
-- 更新时间戳表的last_load字段  
INSERT OVERWRITE TABLE rds.cdc_time SELECT current_load, current_load FROM rds.cdc_time;
4. 测试
(1)执行下面的SQL脚本增加三个带有交货日期的销售订单。
USE source;
/***      
新增订单日期为2016年7月17日的3条订单。  
***/    
SET @start_date := unix_timestamp('2016-07-17');    
SET @end_date := unix_timestamp('2016-07-18'); 
SET @request_delivery_date := '2016-07-20';   
DROP TABLE IF EXISTS temp_sales_order_data;    
CREATE TABLE temp_sales_order_data AS SELECT * FROM sales_order WHERE 1=0;     
    
SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    
SET @amount := floor(1000 + rand() * 9000);  
SET @quantity := floor(10 + rand() * 90);  
INSERT INTO temp_sales_order_data VALUES (126, 1, 1, @order_date, @request_delivery_date, @order_date, @amount, @quantity);    
    
SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    
SET @amount := floor(1000 + rand() * 9000);    
SET @quantity := floor(10 + rand() * 90);  
INSERT INTO temp_sales_order_data VALUES (127, 2, 2, @order_date, @request_delivery_date, @order_date, @amount, @quantity);    
    
SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    
SET @amount := floor(1000 + rand() * 9000);  
SET @quantity := floor(10 + rand() * 90);    
INSERT INTO temp_sales_order_data VALUES (128, 3, 3, @order_date, @request_delivery_date, @order_date, @amount, @quantity);    
    
INSERT INTO sales_order    
SELECT NULL,customer_number,product_code,order_date,request_delivery_date,entry_date,order_amount,order_quantity FROM temp_sales_order_data ORDER BY order_date;      
  
COMMIT ;

        修改后的销售订单源数据如下图所示,最后三条含有交货日期。


(2)修改rds.cdc_time的值
USE rds;
INSERT OVERWRITE TABLE rds.cdc_time SELECT '2016-07-17', '2016-07-17' FROM rds.cdc_time;
(3)执行定期装载并查看结果。
        使用下面的命令执行定期装载。
./regular_etl.sh
        使用下面的查询验证结果。
use dw;
select a.order_sk, request_delivery_date_sk, c.date
  from sales_order_fact a, date_dim b, date_dim c
 where a.order_date_sk = b.date_sk 
   and a.request_delivery_date_sk = c.date_sk ;
        查询结果如下图所示,可以看到只有三个新的销售订单具有request_delivery_date_sk值,是2016年7月20日。

5. 使用角色扮演维度查询
-- 使用表别名查询
USE dw;  
  
SELECT   
    order_date_dim.date order_date,  
    request_delivery_date_dim.date request_delivery_date,  
    SUM(order_amount),  
    COUNT(*)  
FROM  
    sales_order_fact a,  
    date_dim order_date_dim,  
    date_dim request_delivery_date_dim  
WHERE  
    a.order_date_sk = order_date_dim.date_sk  
        AND a.request_delivery_date_sk = request_delivery_date_dim.date_sk  
GROUP BY order_date_dim.date , request_delivery_date_dim.date  
CLUSTER BY order_date_dim.date , request_delivery_date_dim.date;

-- 使用视图查询
USE dw;  
  
CREATE VIEW order_date_dim 
(order_date_sk, order_date, month, month_name,  quarter, year, promo_ind) 
AS SELECT * FROM date_dim;  
  
CREATE VIEW request_delivery_date_dim 
(request_delivery_date_sk, request_delivery_date, month, month_name, quarter, year, promo_ind) 
AS SELECT * FROM date_dim;

SELECT   
    order_date,  
    request_delivery_date,  
    SUM(order_amount),  
    COUNT(*)  
FROM  
    sales_order_fact a,  
    order_date_dim b,  
    request_delivery_date_dim c  
WHERE  
    a.order_date_sk = b.order_date_sk  
        AND a.request_delivery_date_sk = c.request_delivery_date_sk  
GROUP BY order_date , request_delivery_date  
CLUSTER BY order_date , request_delivery_date;

        上面两个查询的结果相同,如下图所示:


文章评论

聊聊HTTPS和SSL/TLS协议
聊聊HTTPS和SSL/TLS协议
亲爱的项目经理,我恨你
亲爱的项目经理,我恨你
团队中“技术大拿”并非越多越好
团队中“技术大拿”并非越多越好
5款最佳正则表达式编辑调试器
5款最佳正则表达式编辑调试器
程序员的一天:一寸光阴一寸金
程序员的一天:一寸光阴一寸金
Java程序员必看电影
Java程序员必看电影
“懒”出效率是程序员的美德
“懒”出效率是程序员的美德
Java 与 .NET 的平台发展之争
Java 与 .NET 的平台发展之争
2013年美国开发者薪资调查报告
2013年美国开发者薪资调查报告
程序员和编码员之间的区别
程序员和编码员之间的区别
老程序员的下场
老程序员的下场
 程序员的样子
程序员的样子
代码女神横空出世
代码女神横空出世
Web开发人员为什么越来越懒了?
Web开发人员为什么越来越懒了?
程序员眼里IE浏览器是什么样的
程序员眼里IE浏览器是什么样的
程序员周末都喜欢做什么?
程序员周末都喜欢做什么?
我的丈夫是个程序员
我的丈夫是个程序员
我跳槽是因为他们的显示器更大
我跳槽是因为他们的显示器更大
Web开发者需具备的8个好习惯
Web开发者需具备的8个好习惯
我是如何打败拖延症的
我是如何打败拖延症的
写给自己也写给你 自己到底该何去何从
写给自己也写给你 自己到底该何去何从
编程语言是女人
编程语言是女人
初级 vs 高级开发者 哪个性价比更高?
初级 vs 高级开发者 哪个性价比更高?
每天工作4小时的程序员
每天工作4小时的程序员
总结2014中国互联网十大段子
总结2014中国互联网十大段子
什么才是优秀的用户界面设计
什么才是优秀的用户界面设计
程序员应该关注的一些事儿
程序员应该关注的一些事儿
10个帮程序员减压放松的网站
10个帮程序员减压放松的网站
旅行,写作,编程
旅行,写作,编程
老美怎么看待阿里赴美上市
老美怎么看待阿里赴美上市
鲜为人知的编程真相
鲜为人知的编程真相
“肮脏的”IT工作排行榜
“肮脏的”IT工作排行榜
为什么程序员都是夜猫子
为什么程序员都是夜猫子
看13位CEO、创始人和高管如何提高工作效率
看13位CEO、创始人和高管如何提高工作效率
为啥Android手机总会越用越慢?
为啥Android手机总会越用越慢?
那些争议最大的编程观点
那些争议最大的编程观点
那些性感的让人尖叫的程序员
那些性感的让人尖叫的程序员
60个开发者不容错过的免费资源库
60个开发者不容错过的免费资源库
如何区分一个程序员是“老手“还是“新手“?
如何区分一个程序员是“老手“还是“新手“?
程序猿的崛起——Growth Hacker
程序猿的崛起——Growth Hacker
Google伦敦新总部 犹如星级庄园
Google伦敦新总部 犹如星级庄园
程序员最害怕的5件事 你中招了吗?
程序员最害怕的5件事 你中招了吗?
2013年中国软件开发者薪资调查报告
2013年中国软件开发者薪资调查报告
中美印日四国程序员比较
中美印日四国程序员比较
程序员的鄙视链
程序员的鄙视链
十大编程算法助程序员走上高手之路
十大编程算法助程序员走上高手之路
漫画:程序员的工作
漫画:程序员的工作
如何成为一名黑客
如何成为一名黑客
做程序猿的老婆应该注意的一些事情
做程序猿的老婆应该注意的一些事情
软件开发程序错误异常ExceptionCopyright © 2009-2015 MyException 版权所有