Question about executor

Hi I am reading heavyDB code/doc and have some questions about the executor.

From my understanding, the execution logic can be summarized as follows:
Calcite logical plan → optimized DAG (after heavyDB DAG optimization) → query step (each node will be translated into a query step) → work unit (each query step will be converted to a work unit). → execute work unit (include both JIT and execution).
Please correct my understanding if I am wrong!!

I have some questions about the execution process:

  1. Suppose we have single GPU, are different work units executed sequentially? For example, workunit1 is a join, workunit2 is a groupby (we assume the join and groupby will not be fused into a compound node) , the executor will get all the join results before executing the grouby?
  2. How are data passed between different work units? What if the intermediate results are too big? Still the example above, you don’t know the size of join outputs until you execute it, but you need to prepare for the intermediate result buffer when creating the work unit, then how do you know how much intermediate buffer size you need to allocate? Moreover, the intermediate join result might be bigger than the GPU memory size, do you just fail or offload to CPU?
  3. I notice heavyDB has the fusion optimization as described here 7.2. DAG Builder / Optimizer — OmniSciDB documentation. I’m wondering the granularity of fusion. Do you just coalesce the kernel launch (e.g., still launch multiple kernels for different logical operators), or do you combine different operations on the same record (e.g., launch one kernel that performs multiple operations, such as groupby, project and filter)?

Thanks a lot!! Any comments are highly appreciated!

Hi @Lily_Liu,

I’ll try answering your questions.

  1. On a typical workload with one or more join, a group by with some filtering is run in-lined, with the join first, then the filter, and finally the group by. So in such a simple case, no intermediate result is created, and data is supposed to use registers. You have intermediate results when you run a multistep running query

as an example, a query like that

heavysql> explain select C_NAME,count(*) from customer join nation on C_NATIONKEY = N_NATIONKEY group by C_NAME;

is served by this code

IR for the GPU:
===============

; Function Attrs: nounwind uwtable
define dso_local void @multifrag_query_hoisted_literals(i8*** readonly, i64* nocapture readonly, i8* readnone, i64* readnone, i64* readnone, i32* readnone, i32* readnone, i64* readnone, i64** readnone, i32* readnone, i32* nocapture readonly, i64* readnone) local_unnamed_addr #22 {
  %13 = load i64, i64* %1, align 8, !tbaa !9
  %14 = icmp eq i64 %13, 0
  br i1 %14, label %31, label %15

15:                                               ; preds = %12
  %16 = icmp eq i8*** %0, null
  br i1 %16, label %17, label %32

17:                                               ; preds = %26, %15
  %18 = phi i32 [ %27, %26 ], [ 0, %15 ]
  %19 = load i32, i32* %10, align 4, !tbaa !7
  %20 = mul i32 %19, %18
  %21 = zext i32 %20 to i64
  %22 = getelementptr inbounds i64, i64* %3, i64 %21
  %23 = getelementptr inbounds i64, i64* %4, i64 %21
  call void @query_group_by_template(i8** null, i8* %2, i64* %22, i64* %23, i32* %5, i64* %7, i64** %8, i32 %18, i64* %11, i32* %6, i32* %9)
  br label %.error_check

.error_check:                                     ; preds = %17
  %24 = call i32 @get_error_code(i32* %9)
  %25 = icmp ne i32 %24, 0
  br i1 %25, label %.error_exit, label %26

.error_exit:                                      ; preds = %.error_check
  call void @record_error_code(i32 %24, i32* %9)
  ret void

26:                                               ; preds = %.error_check
  %27 = add i32 %18, 1
  %28 = zext i32 %27 to i64
  %29 = load i64, i64* %1, align 8, !tbaa !9
  %30 = icmp ugt i64 %29, %28
  br i1 %30, label %17, label %31

31:                                               ; preds = %44, %26, %12
  ret void

32:                                               ; preds = %44, %15
  %33 = phi i64 [ %46, %44 ], [ 0, %15 ]
  %34 = phi i32 [ %45, %44 ], [ 0, %15 ]
  %35 = getelementptr inbounds i8**, i8*** %0, i64 %33
  %36 = load i8**, i8*** %35, align 8, !tbaa !15
  %37 = load i32, i32* %10, align 4, !tbaa !7
  %38 = mul i32 %37, %34
  %39 = zext i32 %38 to i64
  %40 = getelementptr inbounds i64, i64* %3, i64 %39
  %41 = getelementptr inbounds i64, i64* %4, i64 %39
  call void @query_group_by_template(i8** %36, i8* %2, i64* %40, i64* %41, i32* %5, i64* %7, i64** %8, i32 %34, i64* %11, i32* %6, i32* %9)
  br label %.error_check1

.error_check1:                                    ; preds = %32
  %42 = call i32 @get_error_code(i32* %9)
  %43 = icmp ne i32 %42, 0
  br i1 %43, label %.error_exit2, label %44

.error_exit2:                                     ; preds = %.error_check1
  call void @record_error_code(i32 %42, i32* %9)
  ret void

44:                                               ; preds = %.error_check1
  %45 = add i32 %34, 1
  %46 = zext i32 %45 to i64
  %47 = load i64, i64* %1, align 8, !tbaa !9
  %48 = icmp ugt i64 %47, %46
  br i1 %48, label %32, label %31
}

; Function Attrs: uwtable
define void @query_group_by_template(i8** nocapture readnone %byte_stream, i8* nocapture readonly %literals, i64* nocapture readnone %row_count_ptr, i64* nocapture readonly %frag_row_off_ptr, i32* %max_matched_ptr, i64* %agg_init_val, i64** %group_by_buffers, i32 %frag_idx, i64* %join_hash_tables, i32* %total_matched, i32* %error_code) #25 {
.entry:
  %0 = getelementptr i8*, i8** %byte_stream, i32 0
  %1 = load i8*, i8** %0
  %2 = getelementptr i8*, i8** %byte_stream, i32 1
  %3 = load i8*, i8** %2
  %4 = getelementptr i8*, i8** %byte_stream, i32 2
  %5 = load i8*, i8** %4
  %6 = getelementptr i8, i8* %literals, i16 0
  %7 = bitcast i8* %6 to i32*
  %literal_0 = load i32, i32* %7
  %row_count = load i64, i64* %row_count_ptr, align 8
  %8 = load i32, i32* %max_matched_ptr, align 8
  %crt_matched = alloca i32
  %old_total_matched = alloca i32
  %9 = call i32 @pos_start_impl(i32* %error_code)
  %10 = call i32 @pos_step_impl()
  %11 = call i32 @group_buff_idx_impl()
  %12 = sext i32 %9 to i64
  %13 = getelementptr i64*, i64** %group_by_buffers, i32 %11
  %col_buffer = load i64*, i64** %13, align 8
  %result_buffer = call i64* @init_shared_mem_nop(i64* %col_buffer, i32 0)
  %14 = icmp slt i64 %12, %row_count
  br i1 %14, label %.loop.preheader, label %.exit

.loop.preheader:                                  ; preds = %.entry
  %15 = sext i32 %10 to i64
  br label %.forbody

.forbody:                                         ; preds = %.forbody, %.loop.preheader
  %pos = phi i64 [ %12, %.loop.preheader ], [ %17, %.forbody ]
  %16 = call i32 @row_func_hoisted_literals(i64* %result_buffer, i64* null, i32* %crt_matched, i32* %total_matched, i32* %old_total_matched, i32* %max_matched_ptr, i64* %agg_init_val, i64 %pos, i64* %frag_row_off_ptr, i64* %row_count_ptr, i8* %literals, i8* %1, i8* %3, i8* %5, i64* %join_hash_tables, i32 %literal_0)
  %17 = add i64 %pos, %15
  %18 = icmp slt i64 %17, %row_count
  br i1 %18, label %.forbody, label %._crit_edge

._crit_edge:                                      ; preds = %.forbody
  br label %.exit

.exit:                                            ; preds = %._crit_edge, %.entry
  call void @write_back_nop(i64* %col_buffer, i64* %result_buffer, i32 0)
  ret void
}

; Function Attrs: alwaysinline
define i32 @row_func_hoisted_literals(i64* %group_by_buff, i64* %varlen_output_buff, i32* %crt_matched, i32* %total_matched, i32* %old_total_matched, i32* %max_matched, i64* %agg_init_val, i64 %pos, i64* %frag_row_off, i64* %num_rows_per_scan, i8* %literals, i8* %col_buf0, i8* %col_buf1, i8* %col_buf2, i64* %join_hash_tables, i32 %arg_literal_0) #26 {
entry:
  %loop_done = alloca i1
  br label %singleton_true_

exit:                                             ; preds = %singleton_true_, %loop_done_false
  ret i32 0

here comes the join

singleton_true_:                                  ; preds = %entry
  %0 = call i64 @fixed_width_int_decode(i8* %col_buf1, i32 4, i64 %pos)
  %1 = trunc i64 %0 to i32
  %2 = ptrtoint i64* %join_hash_tables to i64
  %3 = sext i32 %1 to i64
  %4 = call i64 @hash_join_idx(i64 %2, i64 %3, i64 0, i64 -1)
  %5 = icmp sge i64 %4, 0
  %remaining_outer_cond_match = alloca i1
  store i1 true, i1* %remaining_outer_cond_match
  %6 = load i1, i1* %remaining_outer_cond_match
  %7 = and i1 %5, %6
  br i1 %7, label %loop_body, label %exit

loop_body:                                        ; preds = %singleton_true_
  store i1 true, i1* %loop_done
  %8 = call i32 @filter_func_hoisted_literals(i8* %col_buf0, i64 %pos, i64* %group_by_buff, i1* %loop_done, i32 %arg_literal_0)
  %9 = load i1, i1* %loop_done
  br i1 %9, label %loop_done_true, label %loop_done_false

loop_done_true:                                   ; preds = %loop_body
  ret i32 %8

loop_done_false:                                  ; preds = %loop_body
  br label %exit
}

here is the filtering of record returned by the join
in this case an, equality
icmp eq i32 %1, %arg_literal_0

; Function Attrs: alwaysinline
define i32 @filter_func_hoisted_literals(i8* %col_buf0, i64 %pos, i64* %group_by_buff, i1* %loop_done, i32 %arg_literal_0) #26 {
entry:
  %0 = call i64 @fixed_width_int_decode(i8* %col_buf0, i32 4, i64 %pos)
  %1 = trunc i64 %0 to i32
  %2 = icmp eq i32 %1, %arg_literal_0
  %3 = and i1 true, %2
  br i1 %3, label %filter_true, label %filter_false

filter_true:                                      ; preds = %entry
  %4 = sext i32 %1 to i64

and here the group by if the filtering succeeded

  %5 = call i64* @get_group_value_fast_keyless(i64* %group_by_buff, i64 %4, i64 0, i64 0, i32 1)
  %6 = bitcast i64* %5 to i32*
  %agg_col_ptr = getelementptr i32, i32* %6, i32 0
  call void @agg_id_int32_shared(i32* %agg_col_ptr, i32 %1)
  %7 = bitcast i64* %5 to i32*
  %8 = getelementptr i32, i32* %7, i32 1
  %9 = atomicrmw add i32* %8, i32 1 monotonic
  br label %filter_false

filter_false:                                     ; preds = %filter_true, %entry
  store i1 false, i1* %loop_done
  ret i32 0
}
  1. In case there is an intermediate result, we have intermediate buffers, and if we find they aren’t big enough, an additional query is run to estimate better the size ( you should see the pre-flight name in the logs)

We have some parameters like this that you can get using the --dev-options switch instead the regular --help.

--enable-bump-allocator [=arg(=1)] (=0)
                                        Enable the bump allocator for 
                                        projection queries on GPU. The bump 
                                        allocator will allocate a fixed size 
                                        buffer for each query, track the number
                                        of rows passing the kernel during query
                                        execution, and copy back only the rows 
                                        that passed the kernel to CPU after 
                                        execution. When disabled, pre-flight 
                                        count queries are used to size the 
                                        output buffer for projection queries.
  1. Replied in the first point. We collapse everything in the same kernel.

Regards,
Candido

Hi Candido, thanks for the reply, this is very helpful!

One more question, is there a way to dump the optimized DAG (the DAG after collapse)? I try to add the following lines after heavydb/RelAlgDag.cpp at cde582ebc3edba3fb86bacefa5bd9b3418a367b4 · heavyai/heavydb · GitHub
VLOG(1) << tree_string(rel_alg_dag.getRootNodeShPtr().get()) << "\n";
But I get the following outputs:

2022-07-05T22:54:50.312586 1 18865 0 8 RelAlgDag.cpp:3146 optimized DAG
2022-07-05T22:54:50.312624 1 18865 0 8 RelAlgDag.cpp:3148 &RelAlgNode
  &RelAlgNode
    &RelAlgNode
    &RelAlgNode

It would be helpful to know which operators are collapsed easily, thanks a lot!

HI,

I don’t know that part of the code, so I can’t help you, but I can try to ask the engineers.

Candido

The problem has been fixed, no need to ask, thanks!

Hi,

could you share with the rest of community how you fixed the problem?

Regards,
Candido