Class: DWH::Adapters::DuckDb

Inherits:
Adapter
  • Object
show all
Defined in:
lib/dwh/adapters/duck_db.rb

Overview

DuckDb adapter.

This requires the ruby DuckDb gem. Installation is a bit complex. Please follow the guide on the gems page to make sure you have DuckDb installed as required before installing the gem.

Generally, adapters should be created using DWH.create. Where a configuration is passed in as options hash or argument list.

Examples:

Basic connection with required only options

DWH.create(:duckdb, {file: 'path/to/my/duckdb' })

Open in read only mode. config docs

DWH.create(:duckdb, {file: 'path/to/my/duckdb' ,duck_config: { access_mode: "READ_ONLY"}})

Constant Summary

Constants included from Settings

Settings::BASE_SETTINGS_FILE

Constants included from Functions::Dates

Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS

Instance Attribute Summary

Attributes inherited from Adapter

#config, #settings

Attributes included from Settings

#adapter_settings

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Adapter

#adapter_name, #alter_settings, config, configuration, #connect!, #connect?, #extra_connection_params, #extra_query_params, #initialize, #reset_settings, #table?, #token_expired?, #with_debug, #with_retry

Methods included from Settings

#adapter_name, #load_settings, #settings_file, #settings_file_path, #using_base_settings?

Methods included from Logger

#logger, logger

Methods included from Behaviors

#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type

Methods included from Functions

#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case

Methods included from Functions::Arrays

#array_exclude_list, #array_in_list, #array_unnest_join

Methods included from Functions::Nulls

#if_null, #null_if, #null_if_zero

Methods included from Functions::ExtractDatePart

#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month

Methods included from Functions::Dates

#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_add, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_lit, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #timestamp_lit, #timestamp_literal, #truncate_date, #week_start_day, #week_starts_on_sunday?

Methods included from Capabilities

#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?

Constructor Details

This class inherits a constructor from DWH::Adapters::Adapter

Class Method Details

.close_allObject

DuckDB is an in process database so we don't want to open multiple instances of the same db in memory. Rather, we open one instance but many connections. Use this method to close them all.



58
59
60
61
62
63
# File 'lib/dwh/adapters/duck_db.rb', line 58

def self.close_all
  databases.each do |key, db|
    db.close
    databases.delete(key)
  end
end

.databasesObject



46
47
48
# File 'lib/dwh/adapters/duck_db.rb', line 46

def self.databases
  @databases ||= {}
end

.open_databasesObject



50
51
52
# File 'lib/dwh/adapters/duck_db.rb', line 50

def self.open_databases
  databases.size
end

Instance Method Details

#closeObject

This disconnects the current connection but the db is still in process and can be reconnected to.

(see Adapter#close)



70
71
72
73
# File 'lib/dwh/adapters/duck_db.rb', line 70

def close
  connection.disconnect
  @connection = nil
end

#connectionObject

Creates a connection to the target database and returns the connection object or self



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/dwh/adapters/duck_db.rb', line 23

def connection
  return @connection if @connection

  if self.class.databases.key?(config[:file])
    @db = self.class.databases[config[:file]]
  else
    ducked_config = DuckDB::Config.new
    if config.key?(:duck_config)
      config[:duck_config].each do |key, val|
        ducked_config[key.to_s] = val
      end
    end
    @db = DuckDB::Database.open(config[:file], ducked_config)
    self.class.databases[config[:file]] = @db
  end

  @connection = @db.connect

  @connection
rescue StandardError => e
  raise ConfigError, e.message
end

#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...

Execute sql on the target database.

Parameters:

  • sql (String)

    actual sql

  • format (Symbol, String) (defaults to: :array)

    return format type

    • array returns array of array
    • object returns array of Hashes
    • csv returns as csv
    • native returns the native result from any clients used
      • For example: Postgres using pg client will return PG::Result
      • Http clients will returns the HTTP response object
  • retries (Integer) (defaults to: 0)

    number of retries in case of failure. Default is 0

Returns:

  • (Array<Array>, Hash, CSV, Native)

Raises:



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/dwh/adapters/duck_db.rb', line 157

def execute(sql, format: :array, retries: 0)
  begin
    result = with_debug(sql) { with_retry(retries) { connection.query(sql) } }
  rescue StandardError => e
    raise ExecutionError, e.message
  end

  format = format.downcase if format.is_a?(String)
  case format.to_sym
  when :array
    result.to_a
  when :object
    result_to_hash(result)
  when :csv
    result_to_csv(result)
  when :native
    result
  else
    raise UnsupportedCapability, "Unsupported format: #{format} for this #{name}"
  end
end

#execute_stream(sql, io, stats: nil, retries: 0) ⇒ IO

Execute sql and stream responses back. Data is writtent out in CSV format to the provided IO object.

Parameters:

  • sql (String)

    actual sql

  • io (IO)

    IO object to write records to

  • stats (StreamingStats) (defaults to: nil)

    collect stats and preview data this is optional

  • retries (Integer) (defaults to: 0)

    number of retries in case of failure

Returns:

  • (IO)

Raises:



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/dwh/adapters/duck_db.rb', line 180

def execute_stream(sql, io, stats: nil, retries: 0)
  with_debug(sql) do
    with_retry(retries) do
      result = connection.query(sql)
      io.write(CSV.generate_line(result.columns.map(&:name)))
      result.each do |row|
        stats << row unless stats.nil?
        io.write(CSV.generate_line(row))
      end
    end
  end

  io.rewind
  io
rescue StandardError => e
  raise ExecutionError, e.message
end

#metadata(table, **qualifiers) ⇒ DWH::Table

Get the schema structure of a given a given table_name. Pass in optional catalog and schema info.

Example: metadata("public.big_table") metadata("big_table") metadata("big_table",schema: "public")

Parameters:

  • table (String)
    • table name
  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn't support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn't support

Returns:



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/dwh/adapters/duck_db.rb', line 124

def (table, **qualifiers)
  db_table = Table.new table, **qualifiers
  sql = 'SELECT column_name, data_type, character_maximum_length, numeric_precision,numeric_scale FROM duckdb_columns'

  where = ["table_name = '#{db_table.physical_name}'"]
  where << "database_name = '#{db_table.catalog}'" if db_table.catalog

  where << if db_table.schema
             "schema_name = '#{db_table.schema}'"
           else
             "schema_name = '#{config[:schema]}'"
           end

  cols = execute("#{sql} WHERE #{where.join(' AND ')}")
  cols.each do |col|
    db_table << Column.new(
      name: col[0],
      data_type: col[1],
      precision: col[3],
      scale: col[4],
      max_char_length: col[2]
    )
  end

  db_table
end

#schema?Boolean

True if the configuration was setup with a schema.

Returns:



152
153
154
# File 'lib/dwh/adapters/duck_db.rb', line 152

def schema?
  !config[:schema].nil? && !config[:schema]&.strip&.empty?
end

#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table

Returns basic stats of a given table. Will typically include row_count, date_start, and date_end.

Examples:

stats("public.big_table", date_column: "fact_date")
stats("big_table")
stats("big_table",schema: "public")

Parameters:

  • table (String)

    table name

  • date_column (String) (defaults to: nil)

    optional date column to use to find range

  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn't support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn't support

Returns:

Raises:



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/dwh/adapters/duck_db.rb', line 104

def stats(table, date_column: nil, **qualifiers)
  qualifiers[:schema] = config[:schema] unless qualifiers[:schema]
  db_table = Table.new table, **qualifiers

  sql = <<-SQL
  SELECT count(*) ROW_COUNT
  #{date_column.nil? ? nil : ", min(#{date_column}) DATE_START"}
  #{date_column.nil? ? nil : ", max(#{date_column}) DATE_END"}
  FROM #{db_table.fully_qualified_table_name}
  SQL

  result = execute(sql)
  TableStats.new(
    row_count: result.first[0],
    date_start: result.first[1],
    date_end: result.first[2]
  )
end

#stream(sql) {|chunk| ... } ⇒ Object

Executes the given sql and yields the streamed results to the given block.

Parameters:

  • sql (String)

    actual sql

Yields:

  • (chunk)

    Yields a streamed chunk as it streams in. The chunk type might vary depending on the target db and settings

Raises:



199
200
201
202
203
204
205
206
# File 'lib/dwh/adapters/duck_db.rb', line 199

def stream(sql, &block)
  with_debug(sql) do
    result = connection.query(sql)
    result.each do |row|
      block.call(row)
    end
  end
end

#tables(**qualifiers) ⇒ Array<String>

Get all tables available in the target db. It will use the default catalog and schema config only specified here.

Parameters:

  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn't support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn't support

Returns:

  • (Array<String>)


86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/dwh/adapters/duck_db.rb', line 86

def tables(**qualifiers)
  catalog, schema = qualifiers.values_at(:catalog, :schema)
  sql = 'SELECT table_name FROM duckdb_tables'

  where = []
  where << "database_name = '#{catalog}'" if catalog

  where << if schema
             "schema_name = '#{schema}'"
           else
             "schema_name = '#{config[:schema]}'"
           end

  res = execute("#{sql} WHERE #{where.join(' AND ')}")
  res.flatten
end

#test_connection(raise_exception: false) ⇒ Boolean

Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self

Returns:

Raises:



76
77
78
79
80
81
82
83
# File 'lib/dwh/adapters/duck_db.rb', line 76

def test_connection(raise_exception: false)
  connection
  true
rescue StandardError => e
  raise ConnectionError, e.message if raise_exception

  false
end

#valid_config?Boolean

Returns:



208
209
210
211
212
213
# File 'lib/dwh/adapters/duck_db.rb', line 208

def valid_config?
  super
  require 'duckdb'
rescue LoadError
  raise ConfigError, "Required 'duckdb' gem missing. Please add it to your Gemfile."
end