
通常为了尽可能的减少对业务系统的压力和性能影响,或者因为网络传输异构数据库等原因,一般都是根据特定的增量抽取原则,将数据从业务数据库导出到flat文本文件或者XML文件中,也叫数据缓存区或者数据登台区(这名字起得特别别扭)。本文讨论的是从业务数据库直接抽取到数据仓库。数据仓库是一种体系架构,而不是一种纯粹的技术。实际上,大多数数据库都提供了类似的不同数据库直接连接的接口,例如SQLServer的链接数据库,Oracle的透明网关等等。
关于数据的增量抽取也是一个重要的讨论话题,其原因主要是在超大数据量情况下任何关系数据库都无法满足数据处理的要求。在《数据仓库》(Inmon)一书中,主要描述了以下3种方法:
1、数据增量抽取,主要是基于时间戳的
2、扫描增量文件,实际上就是关系数据库的归档日志。
3、前后映像对比
当然每种方法都有其优势和劣势,本文旨在讨论基于时间戳的数据增量抽取的实现,无意探讨和比较这三种方法的优劣。
当然在进行基于时间戳的数据增量处理之前,首先要满足以下假设。
1、假设在业务数据库中存在着一个特定的时间属性,作为增量抽取的唯一标识。
2、假设在这个字段上存在着索引字段。这样我们的数据增量抽取模拟脚本就不会遭遇到
性能瓶颈。当然我们还会通过将大事务尽可能变成小事务的原则进行优化。
3、假设业务数据库和数据仓库能够以某种方式直接连接。
4、抽取过程中,尽量避免数据转换、清洗的动作,以减少对业务数据库的性能影响。
在满足了以上条件之后,我们才能进一步考虑数据增量抽取脚本的实现。
下面开始对基于时间戳的数据增量抽取进行系统设计:
1、建立链接数据库。
2、首先需要定义一张数据字典表,定义需要进行处理的任务,其中主要包括业务数据库和目
标数据库的表名、字段列表、以及where条件等。
3、有了这张字典表就可以开始进行工作了,为了方便表达,暂时处理成伪代码形式,同时只
以一个表的处理为例。
4、有一点要主要的是,在SQLServer中有两种使用链接数据库的方法:
OPENQUERY ( linked_server , 'query' )linked_server_name.catalog.schema.object_name 的四部分名称
这两种方法各有利弊,第二种容易阅读一些;第一种方法据说把语句提交到源数据库执行的,效率可能会高些(实际的资料并未找到)。
其次这两种方法在使用起来语法有点差别,第一种方法采用的是宿主数据库的语法形式,第二种方法采用的是SQLServer本身的语法形式。因此在写脚本的时候也会有所不同。主要差别是在字段列表和条件处,暂时采用第一种方式。
本文主要是针对关于数据增量抽取的模拟实现——原理进行实现的
实现的环境:
业务数据库:Oracle9i
数据仓库数据库:SQLServer2000
1、前提SQLServer服务器已经安装Oracle驱动,不再详细累述
2、创建链接数据库
打开企业管理器->安全性-链接服务器-右键新建
BEGIN
DECLARE @BeginDate DATETIME,
@EndDate DATETIME,
@TaskName VARCHAR(32),
@Flag INTEGER,
@Num INTEGER,
@CurrDate DATETIME
SELECT @Num = COUNT(TaskName) FROM ExtractTaskList
WHERE UPPER(TaskName) = UPPER('test')
IF @Num != 1
INSERT INTO ExtractTaskList(TaskName,IncType,TransType) VALUES('test',2,2)
--获取列表中的当前任务的时间戳和状态
SELECT @BeginDate = SourceDate,@Flag = Flag FROM ExtractTaskList WHERE TaskName='TEST'
--如果上次执行未成功,这样取值效率会高一些,则从数据仓库表中直接读取
--TargetDate和SourceDate可能会不一致
IF @Flag = 2 OR @Flag IS NULL
SELECT @BeginDate = DATEADD(ss,1,MAX(closedate)) FROM TargetTable
--如果数据仓库无数据,则从业务系统中直接读取,也可以设置一个默认的初始化时间
IF @BeginDate IS NULL
SELECT @BeginDate = MinLogDate FROM OPENQUERY(SOURCE,'SELECT MIN(CloseDate) AS MinLogDate FROM SourceTable')
--如果仍无数据,则表示无数据可抽取,退出执行
IF @BeginDate IS NULL
RETURN
--抽取结束时间为当前时间前一天,每次循环抽取1天数据,可以更改dd为hh,变成按小时抽取--通常业务系统是连续的,如果有疑问也可以从业务系统中获取最大时间
SELECT @EndDate = CONVERT(DATETIME,LEFT(CONVERT(VARCHAR,GETDATE(),120),10)+'
00:00:00')
--更新当前开始时间和结束时间
UPDATE ExtractTaskList
SET TargetDate = @BeginDate,
SourceDate = @EndDate
WHERE UPPER(TaskName) = UPPER('test')
WHILE @BeginDate < @EndDate
BEGINSELECT @sql = ' INSERT INTO TargetTable
(
ID1,
ID2,
Measure1,
Measure2,
CloseDate
)SELECT * FROM OPENQUERY(SOURCE,''select
ID1,
ID2,
Measure1,
Measure2,
CloseDate
FROM SourceTable
WHERE CloseDate >= TO_DATE(''''' + CONVERT(varchar,@BeginDate,120) + ''''',
''''YYYY-MM-DD HH24:MI:SS'
+ ''''') AND CloseDate < TO_DATE(''''' +
CONVERT(varchar,DATEADD(day,1,@BeginDate),120) + ''''', ''''yyyy-mm-dd HH24:MI:SS' + ''''') AND CloseDate < TO_DATE(''''' + CONVERT(varchar,@EndDate,120) + ''''',
''''YYYY-MM-DD HH24:MI:SS'
+ ''''')'')'
--PRINT @sql
EXEC (@sql)
--获取本次任务运行抽取的最大时间
IF DATEADD(day,1,@BeginDate)>@EndDate
SELECT @CurrDate = @EndDate
ELSE
SELECT @CurrDate = DATEADD(day,1,@BeginDate)
--如果@sql执行失败,同样记录状态和时间
IF @@ERROR <> 0
GOTO FAIL
--记录每次运行的时间运行情况,可提供相应参考
UPDATE ExtractTaskList
SET TargetDate = @CurrDate,
Flag = 1
WHERE UPPER(TaskName) = UPPER('test')
SELECT @BeginDate = DATEADD(DD,1,@BeginDate)
END
在数据仓库中通常会存储双重粒度级别的数据来满足不同的需要,轻度综合数据和原始数据,在原始数据层面上可以访问细节数据,而在分析层面则访问轻度综合数据。
原始数据因为存储了基本上原封不动的数据,导致系统存储压力增大,同时也造成查询使系统性能的下降;而轻度综合数据由于数据进行压缩更为简洁,通常情况下对数据仓库的访问95%以上都是通过轻度综合数据访问来进行的。
数据增量聚合的实现和增量抽取类似,都是尽量以时间戳的方式,尽量减少每次事务的开销。
下面开始对基于时间戳的数据增量聚合进行系统设计:
1、首先需要定义一张数据字典表,定义需要进行处理的任务,其中主要包括任务名称,任务
描述,本次聚合开始时间、结束时间、当前时间、执行的系统时间,状态,最大时限等等。
2、有了这张字典表就可以开始进行工作了,为了方便表达,暂时处理成伪代码形式,同时只
以一个表的处理为例。
本文主要是针对数据仓库中的事实表汇总或者聚集进行模拟实现的
实现的环境:
数据仓库数据库:SQLServer2000
创建表脚本
CREATE TABLE t_fact_RunStatus
(
factProcName VARCHAR(40),
factDesc VARCHAR(100),
factType INT,
LastLogTime DATETIME,
BeginTime DATETIME,
EndTime DATETIME,
Status VARCHAR(20),
LogLimit INT,
Step INT,
CancelFlag INT,
CurTime DATETIME
)
GO
CREATE TABLE t_org_table
(
LogDate DATETIME,
Dim1 INT,
Dim2 INT,
Value1 NUMERIC(10,0),
Value2 NUMERIC(10,0)
)
GO
CREATE TABLE t_fact_table
(
LogDate DATETIME,
Dim1 INT,
Dim2 INT,
Measure1 NUMERIC(10,0),
Measure2 NUMERIC(10,0)
)
GO
创建存储过程脚本
CREATE PROCEDURE p_fact_xxx
AS
DECLARE @step INTEGER --获取本次处理距离当前最大时间间隔DECLARE @LogLimit INTEGER --获取本次处理最大时间间隔DECLARE @BeginTime DATETIME --获取本次处理开始时间
DECLARE @EndTime DATETIME --获取本次处理结束时间
DECLARE @Num INTEGER --获取是否存在此次任务
DECLARE @TimeTemp VARCHAR(20) --时间临时中间变量
DECLARE @StatSecTime INTEGER --每次处理的步增情况,默认为1,即1小时DECLARE @LastLogTime DATETIME --每次处理的当前时间点
DECLARE @Status VARCHAR(20) --本次处理的状态,EXCEPTION和FINISHED两种情况DECLARE @CancelFlag INTEGER --取消情况,1和0
BEGIN
SELECT @StatSecTime = 1
--获取上次处理的最后时间,状态和取消情况
SELECT @LastLogTime = LastLogTime,@Status = Status,@CancelFlag = CancelFlag
FROM t_fact_RunStatus
WHERE UPPER(factProcName) = UPPER('p_fact_table')
--假如为取消该任务,则直接退出
IF @CancelFlag = 1 OR @CancelFlag IS NULL
RETURN
--判断是否存在此次任务
SELECT @Num = COUNT(factProcName) FROM t_fact_RunStatus
WHERE UPPER(factProcName) = UPPER('p_fact_table')
--存在更新任务状态,没有则新增一个任务
IF @Num = 1
UPDATE t_fact_RunStatus set Status = 'RUNNING',CurTime=GETDATE()
WHERE UPPER(factProcName) = UPPER('p_fact_table')
ELSE
BEGIN
IF @Num > 1
DELETE FROM t_fact_RunStatus
WHERE UPPER(factProcName) = UPPER('p_fact_table')
INSERT INTO t_fact_RunStatus(factProcName, factDesc, factType,Status, LogLimit, Step, CancelFlag,CurTime)
VALUES(UPPER('p_fact_table'), 'XXXXXX', 60,'RUNNING', 24, 120, 0,GETDATE())
END
--获取本次任务开始的开始时间和结束时间
BEGIN
--如果任务没有正常终结,则从事实表中获取
--否则获取通过配置表获取,不过此处存在一定风险,既没有考虑到事实上的时间盲点IF @Status != 'FINISHED'
SELECT @BeginTime = DATEADD(hh,@StatSecTime,max(LogDate)) FROM t_fact_table ELSE
SELECT @BeginTime = DATEADD(hh,@StatSecTime,@LastLogTime)
IF @BeginTime IS NULL
SELECT @BeginTime = MIN(LogDate) FROM t_org_table
IF @BeginTime IS NULL
BEGIN
UPDATE t_fact_RunStatus SET Status = 'EXCEPTION'
WHERE UPPER(factProcName) = UPPER('p_fact_table')
RETURN
END
--时间取整,此处为整点小时
SELECT @TimeTemp = CONVERT(VARCHAR, @BeginTime,112)
SELECT @TimeTemp = @TimeTemp + ' ' + DATENAME(hour, @BeginTime)
SELECT @TimeTemp = @TimeTemp + ':' + '00:00'
SELECT @BeginTime = CONVERT(DATETIME, @TimeTemp)
--获取距离当前时间最大间隔和当前处理的最大间隔
SELECT @Step = ISNULL(Step, 120),@LogLimit = ISNULL(LogLimit, 24)
FROM t_fact_RunStatus
WHERE UPPER(factProcName) = UPPER('p_fact_table')
SELECT @EndTime = DATEADD(minute, -@step, getdate())
--如果要处理的时间间隔大于指定间隔,则从原始记录表中获取开始时间和结束时间
--否则直接对结束时间进行截取
IF DATEDIFF(hour, @BeginTime, @EndTime) > @LogLimit
BEGIN
SELECT @BeginTime = MIN(LogDate) FROM t_org_table WHERE LogDate >= @BeginTime SELECT @TimeTemp = CONVERT(VARCHAR, @BeginTime,112)
SELECT @TimeTemp = @TimeTemp + ' ' + DATENAME(hour, @BeginTime)
SELECT @TimeTemp = @TimeTemp + ':' + '00:00'
SELECT @BeginTime = CONVERT(DATETIME, @TimeTemp)
SELECT @EndTime = DATEADD(HOUR, @LogLimit, @BeginTime)
END
ELSE
BEGIN
SELECT @TimeTemp = CONVERT(VARCHAR, @EndTime,112)
SELECT @TimeTemp = @TimeTemp + ' ' + DATENAME(hour, @EndTime)
SELECT @TimeTemp = @TimeTemp + ':' + '00:00'SELECT @EndTime = CONVERT(DATETIME, @TimeTemp) END
END
--更新开始和结束时间
UPDATE t_fact_RunStatus
SET BeginTime = @BeginTime,
EndTime = @EndTime
WHERE UPPER(factProcName) = UPPER('p_fact_table')
--为了减少系统压力,循环进行数据处理
WHILE @BeginTime < @EndTime
BEGIN
BEGIN TRAN Tran_Fact
INSERT INTO t_fact_table
(
LogDate,
Dim1 ,
Dim2 ,
Measure1 ,
Measure2
)
SELECT
@BeginTime,
Dim1,
Dim2,
SUM(Value1),
SUM(Value2)
FROM t_org_table a
WHERE LogDate >= @BeginTime
AND LogDate < DATEADD(hour,@StatSecTime, @BeginTime) GROUP BY Dim1,Dim2
IF @@ERROR <> 0
GOTO FAIL
--每次处理完,必须更新当前的时间标志
UPDATE t_fact_RunStatus
SET LastLogTime=@BeginTime,
CurTime = GETDATE(),
Status = 'FINISHED'
WHERE UPPER(factProcName) = UPPER('p_fact_table')
