|
| 1 | +# frozen_string_literal: true |
| 2 | + |
| 3 | +# rubocop:disable Style/OneClassPerFile |
| 4 | +require 'duckdb' |
| 5 | +require 'polars-df' |
| 6 | +require 'tmpdir' |
| 7 | + |
| 8 | +class PolarsDataFrameTableAdapter |
| 9 | + def call(data_frame, name, columns: nil) |
| 10 | + columns ||= infer_columns(data_frame) |
| 11 | + DuckDB::TableFunction.create(name:, columns:, &execute_block(data_frame)) |
| 12 | + end |
| 13 | + |
| 14 | + private |
| 15 | + |
| 16 | + def execute_block(data_frame) |
| 17 | + counter = 0 |
| 18 | + height = data_frame.height |
| 19 | + width = data_frame.width |
| 20 | + proc do |_func_info, output| |
| 21 | + next counter = 0 if counter >= height |
| 22 | + |
| 23 | + write_row(data_frame, output, counter, width) |
| 24 | + counter += 1 |
| 25 | + 1 |
| 26 | + end |
| 27 | + end |
| 28 | + |
| 29 | + def write_row(data_frame, output, counter, width) |
| 30 | + width.times { |index| output.set_value(index, 0, data_frame[counter, index]) } |
| 31 | + end |
| 32 | + |
| 33 | + def infer_columns(data_frame) |
| 34 | + data_frame.columns.to_h { |header| [header, DuckDB::LogicalType::VARCHAR] } |
| 35 | + end |
| 36 | +end |
| 37 | + |
| 38 | +# Batch approach: write BATCH_SIZE rows per execute call to reduce Ruby<->C crossings |
| 39 | +class PolarsDataFrameBatchTableAdapter |
| 40 | + BATCH_SIZE = 2048 |
| 41 | + |
| 42 | + def call(data_frame, name, columns: nil) |
| 43 | + columns ||= infer_columns(data_frame) |
| 44 | + DuckDB::TableFunction.create(name:, columns:, &execute_block(data_frame)) |
| 45 | + end |
| 46 | + |
| 47 | + private |
| 48 | + |
| 49 | + def execute_block(data_frame) |
| 50 | + offset = 0 |
| 51 | + height = data_frame.height |
| 52 | + width = data_frame.width |
| 53 | + proc do |_func_info, output| |
| 54 | + next offset = 0 if offset >= height |
| 55 | + |
| 56 | + rows = [height - offset, BATCH_SIZE].min |
| 57 | + write_batch(data_frame, output, offset, rows, width) |
| 58 | + offset += rows |
| 59 | + rows |
| 60 | + end |
| 61 | + end |
| 62 | + |
| 63 | + def write_batch(data_frame, output, offset, rows, width) |
| 64 | + rows.times do |row_idx| |
| 65 | + width.times { |col_idx| output.set_value(col_idx, row_idx, data_frame[offset + row_idx, col_idx]) } |
| 66 | + end |
| 67 | + end |
| 68 | + |
| 69 | + def infer_columns(data_frame) |
| 70 | + data_frame.columns.to_h { |header| [header, DuckDB::LogicalType::VARCHAR] } |
| 71 | + end |
| 72 | +end |
| 73 | + |
| 74 | +# Optimized batch approach: pre-extract columns as Ruby arrays to avoid |
| 75 | +# repeated Polars FFI calls, and use assign_string_element to skip type dispatch |
| 76 | +class PolarsDataFrameOptimizedTableAdapter |
| 77 | + BATCH_SIZE = 2048 |
| 78 | + |
| 79 | + def call(data_frame, name, columns: nil) |
| 80 | + columns ||= infer_columns(data_frame) |
| 81 | + DuckDB::TableFunction.create(name:, columns:, &execute_block(data_frame)) |
| 82 | + end |
| 83 | + |
| 84 | + private |
| 85 | + |
| 86 | + # rubocop:disable Metrics/MethodLength |
| 87 | + def execute_block(data_frame) |
| 88 | + col_arrays = extract_columns(data_frame) |
| 89 | + offset = 0 |
| 90 | + height = data_frame.height |
| 91 | + width = data_frame.width |
| 92 | + proc do |_func_info, output| |
| 93 | + next offset = 0 if offset >= height |
| 94 | + |
| 95 | + rows = [height - offset, BATCH_SIZE].min |
| 96 | + vectors = width.times.map { |i| output.get_vector(i) } |
| 97 | + write_batch(col_arrays, vectors, offset, rows) |
| 98 | + offset += rows |
| 99 | + rows |
| 100 | + end |
| 101 | + end |
| 102 | + # rubocop:enable Metrics/MethodLength |
| 103 | + |
| 104 | + def extract_columns(data_frame) |
| 105 | + data_frame.columns.map { |col| data_frame[col].cast(Polars::Utf8).to_a } |
| 106 | + end |
| 107 | + |
| 108 | + def write_batch(col_arrays, vectors, offset, rows) |
| 109 | + col_arrays.each_with_index do |col_data, col_idx| |
| 110 | + vec = vectors[col_idx] |
| 111 | + rows.times { |row_idx| vec.assign_string_element(row_idx, col_data[offset + row_idx].to_s) } |
| 112 | + end |
| 113 | + end |
| 114 | + |
| 115 | + def infer_columns(data_frame) |
| 116 | + data_frame.columns.to_h { |header| [header, DuckDB::LogicalType::VARCHAR] } |
| 117 | + end |
| 118 | +end |
| 119 | + |
| 120 | +def query_via_parquet(con, data_frame, name, parquet_path) |
| 121 | + data_frame.write_parquet(parquet_path) |
| 122 | + con.query("CREATE OR REPLACE TABLE #{name} AS SELECT * FROM read_parquet('#{parquet_path}')") |
| 123 | + con.query("SELECT * FROM #{name}").to_a |
| 124 | +end |
| 125 | + |
| 126 | +df = Polars::DataFrame.new( |
| 127 | + { |
| 128 | + id: 100_000.times.map { |i| i + 1 }, |
| 129 | + name: 100_000.times.map { |i| "Name#{i + 1}" }, |
| 130 | + age: 100_000.times.map { rand(0..100) } |
| 131 | + } |
| 132 | +) |
| 133 | + |
| 134 | +db = DuckDB::Database.open |
| 135 | +con = db.connect |
| 136 | +con.query('SET threads=1') |
| 137 | + |
| 138 | +DuckDB::TableFunction.add_table_adapter(Polars::DataFrame, PolarsDataFrameTableAdapter.new) |
| 139 | +start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) |
| 140 | +con.expose_as_table(df, 'polars_tf') |
| 141 | +con.query('SELECT * FROM polars_tf()').to_a |
| 142 | +end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) |
| 143 | + |
| 144 | +DuckDB::TableFunction.add_table_adapter(Polars::DataFrame, PolarsDataFrameBatchTableAdapter.new) |
| 145 | +start_time3 = Process.clock_gettime(Process::CLOCK_MONOTONIC) |
| 146 | +con.expose_as_table(df, 'polars_tf_batch') |
| 147 | +con.query('SELECT * FROM polars_tf_batch()').to_a |
| 148 | +end_time3 = Process.clock_gettime(Process::CLOCK_MONOTONIC) |
| 149 | + |
| 150 | +DuckDB::TableFunction.add_table_adapter(Polars::DataFrame, PolarsDataFrameOptimizedTableAdapter.new) |
| 151 | +start_time4 = Process.clock_gettime(Process::CLOCK_MONOTONIC) |
| 152 | +con.expose_as_table(df, 'polars_tf_opt') |
| 153 | +con.query('SELECT * FROM polars_tf_opt()').to_a |
| 154 | +end_time4 = Process.clock_gettime(Process::CLOCK_MONOTONIC) |
| 155 | + |
| 156 | +parquet_path = File.join(Dir.tmpdir, 'issue922_benchmark.parquet') |
| 157 | +start_time2 = Process.clock_gettime(Process::CLOCK_MONOTONIC) |
| 158 | +query_via_parquet(con, df, 'polars_pq', parquet_path) |
| 159 | +end_time2 = Process.clock_gettime(Process::CLOCK_MONOTONIC) |
| 160 | + |
| 161 | +con.close |
| 162 | +db.close |
| 163 | +File.delete(parquet_path) |
| 164 | + |
| 165 | +puts "Time taken for table function approach (1 row/call): #{end_time - start_time} seconds" |
| 166 | +puts "Time taken for table function approach (batch/call): #{end_time3 - start_time3} seconds" |
| 167 | +puts "Time taken for table function approach (batch + pre-extract): #{end_time4 - start_time4} seconds" |
| 168 | +puts "Time taken for parquet file approach: #{end_time2 - start_time2} seconds" |
| 169 | +# rubocop:enable Style/OneClassPerFile |
0 commit comments