Class: DWH::Adapters::DuckDb
- Defined in:
- lib/dwh/adapters/duck_db.rb
Overview
DuckDb adapter.
This requires the ruby DuckDb gem. Installation is a bit complex. Please follow the guide on the gems page to make sure you have DuckDb installed as required before installing the gem.
Generally, adapters should be created using DWH.create. Where a configuration is passed in as options hash or argument list.
Constant Summary
Constants included from Settings
Constants included from Functions::Dates
Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS
Instance Attribute Summary
Attributes inherited from Adapter
Attributes included from Settings
Class Method Summary collapse
-
.close_all ⇒ Object
DuckDB is an in process database so we don't want to open multiple instances of the same db in memory.
- .databases ⇒ Object
- .open_databases ⇒ Object
Instance Method Summary collapse
-
#close ⇒ Object
This disconnects the current connection but the db is still in process and can be reconnected to.
-
#connection ⇒ Object
Creates a connection to the target database and returns the connection object or self.
-
#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...
Execute sql on the target database.
-
#execute_stream(sql, io, stats: nil, retries: 0) ⇒ IO
Execute sql and stream responses back.
-
#metadata(table, **qualifiers) ⇒ DWH::Table
Get the schema structure of a given a given table_name.
-
#schema? ⇒ Boolean
True if the configuration was setup with a schema.
-
#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table
Returns basic stats of a given table.
-
#stream(sql) {|chunk| ... } ⇒ Object
Executes the given sql and yields the streamed results to the given block.
-
#tables(**qualifiers) ⇒ Array<String>
Get all tables available in the target db.
-
#test_connection(raise_exception: false) ⇒ Boolean
Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self.
- #valid_config? ⇒ Boolean
Methods inherited from Adapter
#adapter_name, #alter_settings, config, configuration, #connect!, #connect?, #extra_connection_params, #extra_query_params, #initialize, #reset_settings, #table?, #token_expired?, #with_debug, #with_retry
Methods included from Settings
#adapter_name, #load_settings, #settings_file, #settings_file_path, #using_base_settings?
Methods included from Logger
Methods included from Behaviors
#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type
Methods included from Functions
#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case
Methods included from Functions::Arrays
#array_exclude_list, #array_in_list, #array_unnest_join
Methods included from Functions::Nulls
#if_null, #null_if, #null_if_zero
Methods included from Functions::ExtractDatePart
#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month
Methods included from Functions::Dates
#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_add, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_lit, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #timestamp_lit, #timestamp_literal, #truncate_date, #week_start_day, #week_starts_on_sunday?
Methods included from Capabilities
#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?
Constructor Details
This class inherits a constructor from DWH::Adapters::Adapter
Class Method Details
.close_all ⇒ Object
DuckDB is an in process database so we don't want to open multiple instances of the same db in memory. Rather, we open one instance but many connections. Use this method to close them all.
58 59 60 61 62 63 |
# File 'lib/dwh/adapters/duck_db.rb', line 58 def self.close_all databases.each do |key, db| db.close databases.delete(key) end end |
.databases ⇒ Object
46 47 48 |
# File 'lib/dwh/adapters/duck_db.rb', line 46 def self.databases @databases ||= {} end |
.open_databases ⇒ Object
50 51 52 |
# File 'lib/dwh/adapters/duck_db.rb', line 50 def self.open_databases databases.size end |
Instance Method Details
#close ⇒ Object
This disconnects the current connection but the db is still in process and can be reconnected to.
(see Adapter#close)
70 71 72 73 |
# File 'lib/dwh/adapters/duck_db.rb', line 70 def close connection.disconnect @connection = nil end |
#connection ⇒ Object
Creates a connection to the target database and returns the connection object or self
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/dwh/adapters/duck_db.rb', line 23 def connection return @connection if @connection if self.class.databases.key?(config[:file]) @db = self.class.databases[config[:file]] else ducked_config = DuckDB::Config.new if config.key?(:duck_config) config[:duck_config].each do |key, val| ducked_config[key.to_s] = val end end @db = DuckDB::Database.open(config[:file], ducked_config) self.class.databases[config[:file]] = @db end @connection = @db.connect @connection rescue StandardError => e raise ConfigError, e. end |
#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...
Execute sql on the target database.
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/dwh/adapters/duck_db.rb', line 157 def execute(sql, format: :array, retries: 0) begin result = with_debug(sql) { with_retry(retries) { connection.query(sql) } } rescue StandardError => e raise ExecutionError, e. end format = format.downcase if format.is_a?(String) case format.to_sym when :array result.to_a when :object result_to_hash(result) when :csv result_to_csv(result) when :native result else raise UnsupportedCapability, "Unsupported format: #{format} for this #{name}" end end |
#execute_stream(sql, io, stats: nil, retries: 0) ⇒ IO
Execute sql and stream responses back. Data is writtent out in CSV format to the provided IO object.
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/dwh/adapters/duck_db.rb', line 180 def execute_stream(sql, io, stats: nil, retries: 0) with_debug(sql) do with_retry(retries) do result = connection.query(sql) io.write(CSV.generate_line(result.columns.map(&:name))) result.each do |row| stats << row unless stats.nil? io.write(CSV.generate_line(row)) end end end io.rewind io rescue StandardError => e raise ExecutionError, e. end |
#metadata(table, **qualifiers) ⇒ DWH::Table
Get the schema structure of a given a given table_name. Pass in optional catalog and schema info.
Example: metadata("public.big_table") metadata("big_table") metadata("big_table",schema: "public")
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/dwh/adapters/duck_db.rb', line 124 def (table, **qualifiers) db_table = Table.new table, **qualifiers sql = 'SELECT column_name, data_type, character_maximum_length, numeric_precision,numeric_scale FROM duckdb_columns' where = ["table_name = '#{db_table.physical_name}'"] where << "database_name = '#{db_table.catalog}'" if db_table.catalog where << if db_table.schema "schema_name = '#{db_table.schema}'" else "schema_name = '#{config[:schema]}'" end cols = execute("#{sql} WHERE #{where.join(' AND ')}") cols.each do |col| db_table << Column.new( name: col[0], data_type: col[1], precision: col[3], scale: col[4], max_char_length: col[2] ) end db_table end |
#schema? ⇒ Boolean
True if the configuration was setup with a schema.
152 153 154 |
# File 'lib/dwh/adapters/duck_db.rb', line 152 def schema? !config[:schema].nil? && !config[:schema]&.strip&.empty? end |
#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table
Returns basic stats of a given table. Will typically include row_count, date_start, and date_end.
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/dwh/adapters/duck_db.rb', line 104 def stats(table, date_column: nil, **qualifiers) qualifiers[:schema] = config[:schema] unless qualifiers[:schema] db_table = Table.new table, **qualifiers sql = <<-SQL SELECT count(*) ROW_COUNT #{date_column.nil? ? nil : ", min(#{date_column}) DATE_START"} #{date_column.nil? ? nil : ", max(#{date_column}) DATE_END"} FROM #{db_table.fully_qualified_table_name} SQL result = execute(sql) TableStats.new( row_count: result.first[0], date_start: result.first[1], date_end: result.first[2] ) end |
#stream(sql) {|chunk| ... } ⇒ Object
Executes the given sql and yields the streamed results to the given block.
199 200 201 202 203 204 205 206 |
# File 'lib/dwh/adapters/duck_db.rb', line 199 def stream(sql, &block) with_debug(sql) do result = connection.query(sql) result.each do |row| block.call(row) end end end |
#tables(**qualifiers) ⇒ Array<String>
Get all tables available in the target db. It will use the default catalog and schema config only specified here.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/dwh/adapters/duck_db.rb', line 86 def tables(**qualifiers) catalog, schema = qualifiers.values_at(:catalog, :schema) sql = 'SELECT table_name FROM duckdb_tables' where = [] where << "database_name = '#{catalog}'" if catalog where << if schema "schema_name = '#{schema}'" else "schema_name = '#{config[:schema]}'" end res = execute("#{sql} WHERE #{where.join(' AND ')}") res.flatten end |
#test_connection(raise_exception: false) ⇒ Boolean
Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self
76 77 78 79 80 81 82 83 |
# File 'lib/dwh/adapters/duck_db.rb', line 76 def test_connection(raise_exception: false) connection true rescue StandardError => e raise ConnectionError, e. if raise_exception false end |
#valid_config? ⇒ Boolean
208 209 210 211 212 213 |
# File 'lib/dwh/adapters/duck_db.rb', line 208 def valid_config? super require 'duckdb' rescue LoadError raise ConfigError, "Required 'duckdb' gem missing. Please add it to your Gemfile." end |