Class: DWH::Adapters::Adapter Abstract

Inherits:
Object
  • Object
show all
Extended by:
Settings
Includes:
Behaviors, Capabilities, Functions, Logger
Defined in:
lib/dwh/adapters.rb

Overview

This class is abstract.

Adapters base class. All adapters should inherit from this class and implement these core required methods:

Adapter implementations can declare configuration options, defaults, and whether it is required. This is a class level method. They will be validated and a ConfigError will be raised if there is an issue. Methods not implemented will raise NotImplementedError

Additionally, if certain setting need to be overridden you can add a settings file in a relative directory like so: settings/my_adapter.yml. Or, you can specify an exact settings file location at the class level:

class MyAdapter < DWH::Adapters::Adapter settings_file_path "my_dir/my_settings.yml" end

Examples:

class MyAdapter < DWH::Adapters::Adapter
  config :username, String, required: true, message: "login id of the current user"
  config :port, Integer, required: true, default: 5432
end

Direct Known Subclasses

Athena, Druid, DuckDb, MySql, Postgres, Snowflake, SqlServer, Sqlite, Trino

Constant Summary

Constants included from Settings

Settings::BASE_SETTINGS_FILE

Constants included from Functions::Dates

Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS

Instance Attribute Summary collapse

Attributes included from Settings

#adapter_settings

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Settings

load_settings, settings_file, settings_file_path, using_base_settings?

Methods included from Logger

#logger, logger

Methods included from Behaviors

#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type

Methods included from Functions

#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case

Methods included from Functions::Arrays

#array_exclude_list, #array_in_list, #array_unnest_join

Methods included from Functions::Nulls

#if_null, #null_if, #null_if_zero

Methods included from Functions::ExtractDatePart

#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month

Methods included from Functions::Dates

#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_add, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_lit, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #timestamp_lit, #timestamp_literal, #truncate_date, #week_start_day, #week_starts_on_sunday?

Methods included from Capabilities

#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?

Constructor Details

#initialize(config) ⇒ Adapter

Returns a new instance of Adapter.



82
83
84
85
86
87
88
89
90
91
92
# File 'lib/dwh/adapters.rb', line 82

def initialize(config)
  @config = config.transform_keys(&:to_sym)
  # Per instance customization of general settings
  # So you can have multiple connections to Trino
  # but exhibit diff behavior
  @settings = self.class.adapter_settings.merge(
    (config[:settings] || {}).transform_keys(&:to_sym)
  )

  valid_config?
end

Instance Attribute Details

#configHash (readonly)

Instance level configurations

Returns:

  • (Hash)

    the actual instance configuration



80
81
82
# File 'lib/dwh/adapters.rb', line 80

def config
  @config
end

#settingsHash (readonly)

This is the actual runtime settings used by the adapter once initialized. During intialization settings could be overridden. Settings are different from configuration in that settings control behaviour and syntax while configuration determines how we connect.

Returns:

  • (Hash)

    symbolized hash of settings



100
101
102
# File 'lib/dwh/adapters.rb', line 100

def settings
  @settings
end

Class Method Details

.config(name, type, options = {}) ⇒ Hash

Define the configurations required for the adapter to connect and query target database.

Parameters:

  • name (String, Symbol)

    name of the configuration

  • type (Constant)

    ruby type of the configuration

  • options (Hash) (defaults to: {})

    options for the config

Options Hash (options):

  • :required (Boolean)

    Whether option is required

  • :default (*)

    The default value

  • :message (String)

    The error message or info displayed

Returns:

  • (Hash)


58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/dwh/adapters.rb', line 58

def self.config(name, type, options = {})
  configuration[name.to_sym] = {
    type: type,
    required: options[:required] || false,
    default: options[:default],
    message: options[:message] || "Invalid or missing parameter: #{name}",
    allowed: options[:allowed] || []
  }

  define_method(name.to_sym) do
    config[name.to_sym]
  end
end

.configurationHash

Get the adapter class level configuration settings

Returns:

  • (Hash)


74
75
76
# File 'lib/dwh/adapters.rb', line 74

def self.configuration
  @configuration ||= {}
end

Instance Method Details

#adapter_nameString

Adapter name from the class name

Returns:

  • (String)


302
303
304
# File 'lib/dwh/adapters.rb', line 302

def adapter_name
  self.class.name.split('::').last.downcase
end

#alter_settings(changes = {}) ⇒ Hash

Allows an already instantiated adapter to change its current settings. this might be useful in situations where behavior needs to be modified on runtime basis.

Returns:

  • (Hash)

    the complete settings with changes merged



106
107
108
109
110
# File 'lib/dwh/adapters.rb', line 106

def alter_settings(changes = {})
  reset_settings unless @original_settings.nil?
  @original_settings = @settings
  @settings.merge!(changes)
end

#closeObject

Close the connection if it was created.



149
150
151
152
# File 'lib/dwh/adapters.rb', line 149

def close
  @connection&.close
  @connection = nil
end

#connect!Boolean

Test connection and raise exception if connection fails.

Returns:

Raises:



138
139
140
# File 'lib/dwh/adapters.rb', line 138

def connect!
  test_connection(raise_exception: true)
end

#connect?Boolean

Tests whether the dtabase can be connected

Returns:



144
145
146
# File 'lib/dwh/adapters.rb', line 144

def connect?
  test_connection(raise_exception: false)
end

#connectionObject

Creates a connection to the target database and returns the connection object or self

Raises:

  • (NotImplementedError)


121
122
123
# File 'lib/dwh/adapters.rb', line 121

def connection
  raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'"
end

#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...

Execute sql on the target database.

Parameters:

  • sql (String)

    actual sql

  • format (Symbol, String) (defaults to: :array)

    return format type

    • array returns array of array
    • object returns array of Hashes
    • csv returns as csv
    • native returns the native result from any clients used
      • For example: Postgres using pg client will return PG::Result
      • Http clients will returns the HTTP response object
  • retries (Integer) (defaults to: 0)

    number of retries in case of failure. Default is 0

Returns:

  • (Array<Array>, Hash, CSV, Native)

Raises:



167
168
169
# File 'lib/dwh/adapters.rb', line 167

def execute(sql, format: :array, retries: 0)
  raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'"
end

#execute_stream(sql, io, stats: nil, retries: 0) ⇒ IO

Execute sql and stream responses back. Data is writtent out in CSV format to the provided IO object.

Parameters:

  • sql (String)

    actual sql

  • io (IO)

    IO object to write records to

  • stats (StreamingStats) (defaults to: nil)

    collect stats and preview data this is optional

  • retries (Integer) (defaults to: 0)

    number of retries in case of failure

Returns:

  • (IO)

Raises:



180
181
182
# File 'lib/dwh/adapters.rb', line 180

def execute_stream(sql, io, stats: nil, retries: 0)
  raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'"
end

#extra_connection_paramsHash

If any extra connection params were passed in the config object, this will return it.

Returns:

  • (Hash)

    default empty hash



310
311
312
# File 'lib/dwh/adapters.rb', line 310

def extra_connection_params
  config[:extra_connection_params] || {}
end

#extra_query_paramsHash

If the adapter supports it, will pass on extra query params from the config to the executor.

Returns:

  • (Hash)

    default empty hash



318
319
320
# File 'lib/dwh/adapters.rb', line 318

def extra_query_params
  config[:extra_query_params] || {}
end

#metadata(table, **qualifiers) ⇒ DWH::Table

Get the schema structure of a given a given table_name. Pass in optional catalog and schema info.

Example: metadata("public.big_table") metadata("big_table") metadata("big_table",schema: "public")

Parameters:

  • table (String)
    • table name
  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn't support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn't support

Returns:

Raises:

  • (NotImplementedError)


253
254
255
# File 'lib/dwh/adapters.rb', line 253

def (table, **qualifiers)
  raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'"
end

#reset_settingsHash

This returns settings back to its original state prior to running alter_settings.

Returns:

  • (Hash)

    with original settings



115
116
117
# File 'lib/dwh/adapters.rb', line 115

def reset_settings
  @settings = @original_settings if @original_settings
end

#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table

Returns basic stats of a given table. Will typically include row_count, date_start, and date_end.

Examples:

stats("public.big_table", date_column: "fact_date")
stats("big_table")
stats("big_table",schema: "public")

Parameters:

  • table (String)

    table name

  • date_column (String) (defaults to: nil)

    optional date column to use to find range

  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn't support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn't support

Returns:

Raises:



211
212
213
# File 'lib/dwh/adapters.rb', line 211

def stats(table, date_column: nil, **qualifiers)
  raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'"
end

#stream(sql) {|chunk| ... } ⇒ Object

Executes the given sql and yields the streamed results to the given block.

Parameters:

  • sql (String)

    actual sql

Yields:

  • (chunk)

    Yields a streamed chunk as it streams in. The chunk type might vary depending on the target db and settings

Raises:



191
192
193
# File 'lib/dwh/adapters.rb', line 191

def stream(sql, &block)
  raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'"
end

#table?(table, **qualifiers) ⇒ Boolean

Check if table exists in remote db.

Parameters:

  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn't support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn't support

Returns:



235
236
237
# File 'lib/dwh/adapters.rb', line 235

def table?(table, **qualifiers)
  tables(**qualifiers).include?(table)
end

#tables(**qualifiers) ⇒ Array<String>

Get all tables available in the target db. It will use the default catalog and schema config only specified here.

Parameters:

  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn't support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn't support

Returns:

  • (Array<String>)

Raises:

  • (NotImplementedError)


224
225
226
# File 'lib/dwh/adapters.rb', line 224

def tables(**qualifiers)
  raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'"
end

#test_connection(raise_exception: false) ⇒ Boolean

Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self

Returns:

Raises:



130
131
132
# File 'lib/dwh/adapters.rb', line 130

def test_connection(raise_exception: false)
  raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'"
end

#token_expired?Boolean

For adapters that uses access tokens via jwt or oauth this will return wether a tokens is expired

Returns:



325
326
327
# File 'lib/dwh/adapters.rb', line 325

def token_expired?
  @token_expires_at.nil? || Time.now >= @token_expires_at
end

#with_debug(sql, &block) ⇒ Object

Wraps an SQL execution with debug logging. It sill include execution time.

Parameters:

  • sql (String)

    actual sql being executed

Returns:

  • execution results (see #execute)



286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/dwh/adapters.rb', line 286

def with_debug(sql, &block)
  starting = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  logger.debug("=== SQL === \n#{sql}")

  result = block.call

  ending = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  elapsed = ending - starting
  logger.debug("=== FINISHED SQL (#{elapsed.round(1)} secs) ===")

  result
end

#with_retry(max_attempts = 2, &block) ⇒ Object

Will call the block with retries given by the max attempts param. If max attempts is 0, it will just return the block.call

Parameters:

  • max_attempts (Integer) (defaults to: 2)

    max number of retries



262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/dwh/adapters.rb', line 262

def with_retry(max_attempts = 2, &block)
  return block.call if max_attempts.zero?

  attempts = 0

  begin
    attempts += 1
    block.call
  rescue StandardError => e
    if attempts < max_attempts
      logger.warn "Attempt #{attempts} failed with error: #{e.message}. Retrying..."
      retry
    else
      logger.error "Failed after #{attempts} attempts with error: #{e.message}"
      raise
    end
  end
end