Oracle的pipelined函数实现高性能大数据处理

转自:http://mikixiyou.iteye.com/blog/1673672

在plsql开发中,会涉及到一些大数据量表的数据处理,如将某记录数超亿的表的记录经过处理转换插入到另外一张或几张表。

常规的操作方法固然可以实现,但时间、磁盘IO、redo日志等等都非常大。Oracle 提供了一种高级函数,可以将这种数据处理的性能提升到极限。这种函数称为管道函数。

在实际项目中,管道函数会和表函数、数据流函数(即表函数和CURSOR结合)、数据集合、并行度一起使用,达到大数据处理的性能顶峰。


(miki西游 @mikixiyou 原文链接: http://mikixiyou.iteye.com/blog/1673672)


下面是一个例子,将表t_ss_normal的记录插入到表t_target中,插入过程中有部分转换操作。

我分成四个方法来实现这个数据处理操作。


第一个方法,也是最常规的方法,代码如下:


Sql代码 复制代码 收藏代码
  1. create table T_SS_NORMAL 
  2.   owner          VARCHAR2(30), 
  3.   object_name    VARCHAR2(128), 
  4.   subobject_name VARCHAR2(30), 
  5.   object_id      NUMBER, 
  6.   data_object_id NUMBER, 
  7.   object_type    VARCHAR2(19), 
  8.   created        DATE
  9.   last_ddl_time  DATE
  10.   timestamp      VARCHAR2(19), 
  11.   status         VARCHAR2(7), 
  12.   temporary      VARCHAR2(1), 
  13.   generated      VARCHAR2(1), 
  14.   secondary      VARCHAR2(1) 
  15. ); 
  16.  
  17. create table T_TARGET 
  18.   owner       VARCHAR2(30), 
  19.   object_name VARCHAR2(128), 
  20.   comm        VARCHAR2(10) 
  21. ); 


这是源表和目标表的表结构。现在源表有200W条,其数据来自dba_objects视图。


Sql代码 复制代码 收藏代码
  1. create or replace package pkg_test is 
  2.   procedure load_target_normal; 
  3. end pkg_test; 
  4.  
  5. create or replace package body pkg_test is 
  6.   procedure load_target_normal is 
  7.   begin   
  8.     insert into t_target (owner, object_name, comm) 
  9.       select owner, object_name, 'xxx' from t_ss_normal;   
  10.     commit;   
  11.   end
  12. begin 
  13.   null
  14. end pkg_test;  



一个insert into select语句搞定这个数据处理,简单。


第二方法,采用管道函数实现这个数据处理。


Sql代码 复制代码 收藏代码
  1. create type obj_target as object( 
  2. owner VARCHAR2(30), object_name VARCHAR2(128), comm varchar2(10) 
  3. ); 
  4. create or replace type typ_array_target as table of obj_target; 
  5.  
  6. create or replace package pkg_test is 
  7.  
  8.   function pipe_target(p_source_data in sys_refcursor) return typ_array_target 
  9.     pipelined; 
  10.  
  11.   procedure load_target; 
  12. end pkg_test; 



首先创建两个自定义的类型。obj_target的定义和t_target的表结构一致,用于存储每一条目标表记录。typ_array_target用于管道函数的返回值。

接着定义一个管道函数。

普通函数的结尾加一个pipelined关键字,就是管道函数。这个函数的返回参数类型为集合,这是为了使其能作为表函数使用。表函数就是在from子句中以table(v_resultset)调用的,v_resultset就是一个集合类型的参数。

最后定义一个调用存储过程。


在包体中定义该管道函数和调用存储过程。管道函数pipe_target的传入参数一个sys_refcursor类型。这是一个游标,可以理解为使用select * from table才能得到的结果集。

你也可以不用这个传入的游标,取而代之,在函数中定义一个游标,也一样使用。


Sql代码 复制代码 收藏代码
  1.   function pipe_target(p_source_data in sys_refcursor) return typ_array_target 
  2.     pipelined is 
  3.     r_target_data obj_target := obj_target(null, null, null); 
  4.     r_source_data t_ss%rowtype; 
  5.  
  6. begin 
  7.    
  8.     loop 
  9.       fetch p_source_data 
  10.         into r_source_data; 
  11.       exit when p_source_data%notfound;     
  12.        
  13.       r_target_data.owner       := r_source_data.owner; 
  14.       r_target_data.object_name := r_source_data.object_name; 
  15.       r_target_data.comm        := 'xxx';     
  16.       pipe row(r_target_data); 
  17.      
  18.     end loop; 
  19.    
  20.     close p_source_data; 
  21.     return
  22.    
  23.   end
  24.  
  25.   procedure load_target is 
  26.   begin   
  27.     insert into t_target 
  28.       (owner, object_name, comm) 
  29.       select owner, object_name, comm 
  30.         from table(pipe_target(cursor(select * from t_ss_normal)));   
  31.     commit;   
  32.   end
  33.   


关键字 pipe row 的作用是将obj_target插入到typ_array_target类型的数组中,管道函数自动返回这些数据。


因为源表的数据量会非常大,所以在fetch取值时会使用bulk collect ,实现批量取值。这样做可以减少plsql引擎和sql引擎的控制转换次数。这种转换称为上下文切换。



Sql代码 复制代码 收藏代码
  1. function pipe_target_array(p_source_data in sys_refcursor, 
  2.                            p_limit_size  in pls_integer default c_default_limit) 
  3.   return typ_array_target 
  4.   pipelined is   
  5.   r_target_data obj_target := obj_target(null, null, null);  
  6.     
  7.   type typ_source_data is table of t_ss%rowtype index by pls_integer; 
  8.   aa_source_data typ_source_data; 
  9.  
  10. begin 
  11.  
  12.   loop 
  13.     fetch p_source_data bulk collect 
  14.       into aa_source_data; 
  15.     exit when aa_source_data.count = 0; 
  16.    
  17.     for i in 1 .. aa_source_data.count loop 
  18.      
  19.       r_target_data.owner       := aa_source_data(i).owner; 
  20.       r_target_data.object_name := aa_source_data(i).object_name; 
  21.       r_target_data.comm        := 'xxx'
  22.      
  23.       pipe row(r_target_data); 
  24.      
  25.     end loop; 
  26.    
  27.   end loop; 
  28.  
  29.   close p_source_data; 
  30.   return
  31.  
  32. end
  33.  
  34.  
  35. procedure load_target_array is 
  36. begin 
  37.   insert into t_target 
  38.     (owner, object_name, comm) 
  39.     select owner, object_name, comm 
  40.       from table(pipe_target_array(cursor (select * from t_ss_normal), 
  41.                                    100));   
  42.   commit;   
  43. end


还可以使用并行度,使得管道函数可以多进程同时执行。并行度还有一个好处,就是将数据插入方式从常规路径转换为直接路径。直接路径可以大量减少redo日志的生成量。



Sql代码 复制代码 收藏代码
  1. function pipe_target_parallel(p_source_data in sys_refcursor, 
  2.                               p_limit_size  in pls_integer default c_default_limit) 
  3.   return typ_array_target 
  4.   pipelined 
  5.   parallel_enable(partition p_source_data by any) is 
  6.  
  7.   r_target_data obj_target := obj_target(null, null, null); 
  8.  
  9.   type typ_source_data is table of t_ss%rowtype index by pls_integer;   
  10.   aa_source_data typ_source_data; 
  11.  
  12. begin   
  13.   loop 
  14.     fetch p_source_data bulk collect 
  15.       into aa_source_data; 
  16.     exit when aa_source_data.count = 0;     
  17.     for i in 1 .. aa_source_data.count loop       
  18.       r_target_data.owner       := aa_source_data(i).owner; 
  19.       r_target_data.object_name := aa_source_data(i).object_name; 
  20.       r_target_data.comm        := 'xxx';       
  21.       pipe row(r_target_data);       
  22.     end loop;     
  23.   end loop;   
  24.   close p_source_data; 
  25.   return
  26.  
  27. end
  28.  
  29.  
  30. procedure load_target_parallel is 
  31. begin 
  32.   execute immediate 'alter session enable parallel dml';   
  33.   insert /*+parallel(t,4)*/ 
  34.   into t_target t 
  35.     (owner, object_name, comm) 
  36.     select owner, object_name, comm 
  37.       from table(pipe_target_array(cursor (select /*+parallel(s,4)*/ 
  38.                                      * 
  39.                                       from t_ss_normal s), 
  40.                                    100));   
  41.   commit
  42. end




在测试过程中,我测试200W记录的操作,时间从24秒降到到8秒,重做日志也降低更多。


请使用浏览器的分享功能分享到微信等