Table Functions and Cursor Expressions
Pages: 1, 2, 3, 4, 5
Fanout: Using Table Functions with Side Effects
Sometimes the specification for the transformation to be implemented as a table function explicitly excludes source data with certain characteristics. In such cases, it's useful to report on the excluded source data and often most convenient to direct the report to the database for further analysis. A table function may do DML, provided that this is done within an autonomous transaction, as shown in Example 11.
Example 11. Using autonomous transactions to allow for intended "side effects."
create or replace function lookups_fn_with_side_effect
return my_types.lookups_tab
pipelined
/*
uses...
create table exclusions ( n number );
*/
is
pragma autonomous_transaction;
v_row My_Types.lookup_row;
begin
for j in 1..15
loop
case
when j < 11 then
case j
when 1 then v_row.idx := 1; v_row.text := 'one';
when 2 then v_row.idx := 2; v_row.text := 'TWO';
when 3 then v_row.idx := 3; v_row.text := 'three';
when 4 then v_row.idx := 4; v_row.text := 'FOUR';
when 5 then v_row.idx := 5; v_row.text := 'five';
when 6 then v_row.idx := 6; v_row.text := 'SIX';
when 7 then v_row.idx := 7; v_row.text := 'seven';
else v_row.idx := j; v_row.text := 'other';
end case;
pipe row ( v_row );
else
insert into exclusions values ( j );
end case;
end loop;
commit;
return;
end Lookups_Fn_With_Side_Effect;
Parallelizing Table Function Execution: New in Oracle 9i
It's beyond the scope of this article to describe the details of Oracle's parallel query feature. Suffice it to say that when certain environment conditions are met (especially a hardware environment that supports multiple concurrently executing processes making concurrent disk accesses, and a user environment close to single-user) and when the objects referenced in a query have appropriate parallel attributes, then the elapsed time for long-running queries can be cut in direct proportion to the number of available CPUs. This is especially significant in Decision Support Systems (a.k.a. DSS) both at query time and in the Extraction, Transformation, and Load (a.k.a. ETL) operations to populate them.
Oracle 9i introduces table function features to allow their execution to be parallelized.
These features require (with one small exception, discussed shortly) that the table function
has exactly one strongly typed ref cursor input parameter. Let's take a look at the different
ways in which table functions can be designed for parallel execution.
Special Case: Function Behavior is Independent of Input Data Partitioning
Consider a function that processes each row from its input cursor individually. (Such a transformation, which generates two or more output rows from each input row- generically referred to as piviotting-benefits particularly from a table function implementation.) The syntax to parallelize this is straightforward and is shown in Example 12.
Example 12. Parallel execution for arbitrary partitioning of data.
create or replace function Rowwise_Xform_Fn (
p_input_rows in SYS_REFCURSOR )
return My_Types.xforms_tab
pipelined
parallel_enable ( partition p_input_rows by any )
is
v_in_row My_Types.input_row;
v_out_row My_Types.xform_row;
begin
loop
fetch p_input_rows into v_in_row;
exit when p_input_rows%notfound;
v_out_row.n := v_in_row.n*2;
v_out_row.typ := 'a';
pipe row ( v_out_row );
v_out_row.n := v_in_row.n*3;
v_out_row.typ := 'b';
pipe row ( v_out_row );
end loop;
close p_input_rows;
return;
end Rowwise_Xform_Fn;
See Example 13 for the complete working example. They keyword any expresses the
programmer's assertion that the results are independent of the order in which the function
gets the input rows. When this keyword is used, the runtime system randomly partitions
the data among the query slaves. This keyword is appropriate for use with functions that
take in one row, manipulate its columns, and generate output row(s) based on the columns
of this row only. (Of course, if this assertion doesn't hold, the output won't be predictable.)
This is the small exception referred to earlier: The input ref cursor need not be strongly
typed to be partitioned by any.
Example 13. Algorithm is independent of the ordering of the source rows.
create or replace package My_Types is
type input_row is record ( n number );
type cur_t is ref cursor return input_row;
type xform_row is record ( n number, typ char(1) );
type xforms_tab is table of xform_row;
end My_Types;
create table t ( n number );
begin
for j in 1..1000
loop insert into t ( n ) values ( j ); end loop;
commit;
end;
create or replace function Rowwise_Xform_Fn (
p_input_rows in My_Types.cur_t )
return My_Types.xforms_tab
pipelined
parallel_enable ( partition p_input_rows by any )
is
v_in_row My_Types.input_row;
v_out_row My_Types.xform_row;
begin
loop
fetch p_input_rows into v_in_row;
exit when p_input_rows%notfound;
v_out_row.n := v_in_row.n * 2; v_out_row.typ := 'a';
pipe row ( v_out_row );
v_out_row.n := v_in_row.n * 3; v_out_row.typ := 'b';
pipe row ( v_out_row );
end loop;
close p_input_rows;
return;
end Rowwise_Xform_Fn;
select * from table (
Rowwise_Xform_Fn ( cursor ( select n from t ) ) )
where rownum < 11;
The ability to exploit the parallel potential of a table function depends on whether the source can be parallelized.
General Case: Function Behavior Depends on Input Data Partitioning
Consider a transformation along the lines of:
select avg ( salary ), department_id
from employees
group by department_id;
where the aggregation operation to be performed on the set of salaries for a given department is arbitrarily complex such that a classical SQL implementation is impossible, slow by virtue of a function invocation for each row of the source table, or prohibitively challenging to write and debug. For example, it might be that the cost to the employer of paying a given salary depends on the hire date because of changes in benefits packages that affect only employees hired after the date of change. This is illustrated in Example 14, but to avoid obscuring it with a complicated algorithm, the aggregation is simply the sum for the salary for each distinct department. This has the general form shown in Example 15.
Example 14. Algorithm requires only that the source rows are clustered.
Note: In order to avoid having to make the algorithm distractingly complex, the following
DELETE should be issued:
delete from employees where department_id is null;
before continuing thus:
create or replace package My_Types is
type dept_sal_row is record
( sal number(8,2), dept number(4) );
type cur_t is ref cursor return dept_sal_row;
type dept_sals_tab is table of dept_sal_row;
end My_Types;
create or replace function Aggregate_Xform
( p_input_rows in My_Types.cur_t )
return My_Types.dept_sals_tab
pipelined
--[ cluster / order ] p_input_rows by (dept)
--parallel_enable
-- ( partition p_input_rows by [ hash / range] (dept) )
is
g_in_row My_Types.dept_sal_row;
g_out_row My_Types.dept_sal_row;
g_first_time boolean := true;
g_last_dept number := null;
g_got_a_row boolean;
g_new_dept boolean;
g_current_dept employees.department_id%type;
g_prev_dept employees.department_id%type;
v_total_sal number;
procedure Get_Next_Row is begin
fetch p_input_rows into g_in_row;
g_got_a_row := not p_input_rows%notfound;
if g_got_a_row
then
case g_first_time
when true then
g_first_time := false;
g_new_dept := false;
else
g_new_dept := g_prev_dept <> g_in_row.dept;
end case;
g_prev_dept := g_in_row.dept;
end if;
return;
end Get_Next_Row;
function Got_Next_Dept return boolean is begin
g_current_dept := g_in_row.dept;
g_new_dept := false;
return g_got_a_row;
end Got_Next_Dept;
function Got_Next_Row_In_Dept return boolean is begin
return ( not g_new_dept ) and g_got_a_row;
end Got_Next_Row_In_Dept;
begin
Get_Next_Row();
while Got_Next_Dept()
loop
v_total_sal := 0;
while Got_Next_Row_In_Dept()
loop
v_total_sal := v_total_sal + g_in_row.sal;
Get_Next_Row();
end loop;
g_out_row.sal := v_total_sal;
g_out_row.dept := g_current_dept;
pipe row ( g_out_row );
end loop;
close p_input_rows;
return;
end Aggregate_Xform;
Example 15. General form of a function performing an aggregate transformation.
create or replace function Aggregate_Xform (
p_input_rows in My_Types.cur_t )
return My_Types.dept_sals_tab
pipelined
is
...
begin
Get_Next_Row();
while Got_Next_Dept() /* relies on assumption that
all rows for given dept are
delivered consecutively */
loop
v_total_sal := 0;
while Got_Next_Row_In_Dept()
loop
v_total_sal := v_total_sal + g_in_row.sal;
Get_Next_Row();
end loop;
g_out_row.sal := v_total_sal;
g_out_row.dept := g_current_dept;
pipe row ( g_out_row );
end loop;
close p_input_rows;
return;
end Aggregate_Xform;
Given that the input rows will be partitioned between different slaves, the integrity of the algorithm requires that all the rows for a given department go to the same slave, and that all these rows are delivered consecutively. (Strictly speaking, the requirement for consecutive delivery is negotiable, but the design of the algorithm to handle this case would need to be much more elaborate. For that reason, Oracle commits to consecutive delivery.) We use the term clustered to signify this type of delivery, and cluster key for the column (in this case, "department") on which the aggregation is done. But significantly, the algorithm does not care in what order of cluster key it receives each successive cluster, and Oracle doesn't guarantee any particular order here.
This allows the possibility of a quicker algorithm than if rows were required to be
clustered and delivered in order of the cluster key. It scales as order N rather than order
N.log(N), where N is the number of rows. The syntax to accomplish this is shown here:
create or replace function aggregate_xform (
p_input_rows in my_types.cur_t )
return my_types.dept_sals_tab
pipelined
cluster p_input_rows by (dept)
parallel_enable
( partition p_input_rows by hash (dept) )
is ...
We can choose between hash (dept) and range (dept) depending on what we know about the distribution of the values. (Hash will be quicker than range and is the natural
choice to be used with cluster... by.) Here, to be partitioned by a specified column, the
input ref cursor must be strongly typed. Cluster... by isn't allowed without parallel_enable (partition... by).
Note: At version 9.0.1, it's necessary to include ORDER BY on the cluster key in the
SELECT used to invoke the table function as follows, in order to preserve correctness of
behavior, but this restriction will be removed when the order N clustering algorithm is
productized:
select * from table (
Aggregate_Xform (
cursor (
select salary, department_id
from employees
where department_id is not null
order by department_id ) ) );



