Class: DWH::Adapters::Adapter Abstract
- Inherits:
-
Object
- Object
- DWH::Adapters::Adapter
- Extended by:
- Settings
- Includes:
- Behaviors, Capabilities, Functions, Logger
- Defined in:
- lib/dwh/adapters.rb
Overview
Adapters base class. All adapters should inherit from this class and implement these core required methods:
Adapter implementations can declare configuration options, defaults, and whether it is required. This is a class level method. They will be validated and a ConfigError will be raised if there is an issue. Methods not implemented will raise NotImplementedError
Additionally, if certain setting need to be overridden you can add a settings file in a relative directory like so: settings/my_adapter.yml. Or, you can specify an exact settings file location at the class level:
class MyAdapter < DWH::Adapters::Adapter settings_file_path "my_dir/my_settings.yml" end
Constant Summary
Constants included from Settings
Constants included from Functions::Dates
Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS
Instance Attribute Summary collapse
-
#config ⇒ Hash
readonly
Instance level configurations.
-
#settings ⇒ Hash
readonly
This is the actual runtime settings used by the adapter once initialized.
Attributes included from Settings
Class Method Summary collapse
-
.config(name, type, options = {}) ⇒ Hash
Define the configurations required for the adapter to connect and query target database.
-
.configuration ⇒ Hash
Get the adapter class level configuration settings.
Instance Method Summary collapse
-
#adapter_name ⇒ String
Adapter name from the class name.
-
#alter_settings(changes = {}) ⇒ Hash
Allows an already instantiated adapter to change its current settings.
-
#close ⇒ Object
Close the connection if it was created.
-
#connect! ⇒ Boolean
Test connection and raise exception if connection fails.
-
#connect? ⇒ Boolean
Tests whether the dtabase can be connected.
-
#connection ⇒ Object
Creates a connection to the target database and returns the connection object or self.
-
#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...
Execute sql on the target database.
-
#execute_stream(sql, io, stats: nil, retries: 0) ⇒ IO
Execute sql and stream responses back.
-
#extra_connection_params ⇒ Hash
If any extra connection params were passed in the config object, this will return it.
-
#extra_query_params ⇒ Hash
If the adapter supports it, will pass on extra query params from the config to the executor.
-
#initialize(config) ⇒ Adapter
constructor
A new instance of Adapter.
-
#metadata(table, **qualifiers) ⇒ DWH::Table
Get the schema structure of a given a given table_name.
-
#reset_settings ⇒ Hash
This returns settings back to its original state prior to running alter_settings.
-
#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table
Returns basic stats of a given table.
-
#stream(sql) {|chunk| ... } ⇒ Object
Executes the given sql and yields the streamed results to the given block.
-
#table?(table, **qualifiers) ⇒ Boolean
Check if table exists in remote db.
-
#tables(**qualifiers) ⇒ Array<String>
Get all tables available in the target db.
-
#test_connection(raise_exception: false) ⇒ Boolean
Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self.
-
#token_expired? ⇒ Boolean
For adapters that uses access tokens via jwt or oauth this will return wether a tokens is expired.
-
#with_debug(sql, &block) ⇒ Object
Wraps an SQL execution with debug logging.
-
#with_retry(max_attempts = 2, &block) ⇒ Object
Will call the block with retries given by the max attempts param.
Methods included from Settings
load_settings, settings_file, settings_file_path, using_base_settings?
Methods included from Logger
Methods included from Behaviors
#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type
Methods included from Functions
#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case
Methods included from Functions::Arrays
#array_exclude_list, #array_in_list, #array_unnest_join
Methods included from Functions::Nulls
#if_null, #null_if, #null_if_zero
Methods included from Functions::ExtractDatePart
#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month
Methods included from Functions::Dates
#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_add, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_lit, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #timestamp_lit, #timestamp_literal, #truncate_date, #week_start_day, #week_starts_on_sunday?
Methods included from Capabilities
#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?
Constructor Details
#initialize(config) ⇒ Adapter
Returns a new instance of Adapter.
82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/dwh/adapters.rb', line 82 def initialize(config) @config = config.transform_keys(&:to_sym) # Per instance customization of general settings # So you can have multiple connections to Trino # but exhibit diff behavior @settings = self.class.adapter_settings.merge( (config[:settings] || {}).transform_keys(&:to_sym) ) valid_config? end |
Instance Attribute Details
#config ⇒ Hash (readonly)
Instance level configurations
80 81 82 |
# File 'lib/dwh/adapters.rb', line 80 def config @config end |
#settings ⇒ Hash (readonly)
This is the actual runtime settings used by the adapter once initialized. During intialization settings could be overridden. Settings are different from configuration in that settings control behaviour and syntax while configuration determines how we connect.
100 101 102 |
# File 'lib/dwh/adapters.rb', line 100 def settings @settings end |
Class Method Details
.config(name, type, options = {}) ⇒ Hash
Define the configurations required for the adapter to connect and query target database.
58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/dwh/adapters.rb', line 58 def self.config(name, type, = {}) configuration[name.to_sym] = { type: type, required: [:required] || false, default: [:default], message: [:message] || "Invalid or missing parameter: #{name}", allowed: [:allowed] || [] } define_method(name.to_sym) do config[name.to_sym] end end |
.configuration ⇒ Hash
Get the adapter class level configuration settings
74 75 76 |
# File 'lib/dwh/adapters.rb', line 74 def self.configuration @configuration ||= {} end |
Instance Method Details
#adapter_name ⇒ String
Adapter name from the class name
302 303 304 |
# File 'lib/dwh/adapters.rb', line 302 def adapter_name self.class.name.split('::').last.downcase end |
#alter_settings(changes = {}) ⇒ Hash
Allows an already instantiated adapter to change its current settings. this might be useful in situations where behavior needs to be modified on runtime basis.
106 107 108 109 110 |
# File 'lib/dwh/adapters.rb', line 106 def alter_settings(changes = {}) reset_settings unless @original_settings.nil? @original_settings = @settings @settings.merge!(changes) end |
#close ⇒ Object
Close the connection if it was created.
149 150 151 152 |
# File 'lib/dwh/adapters.rb', line 149 def close @connection&.close @connection = nil end |
#connect! ⇒ Boolean
Test connection and raise exception if connection fails.
138 139 140 |
# File 'lib/dwh/adapters.rb', line 138 def connect! test_connection(raise_exception: true) end |
#connect? ⇒ Boolean
Tests whether the dtabase can be connected
144 145 146 |
# File 'lib/dwh/adapters.rb', line 144 def connect? test_connection(raise_exception: false) end |
#connection ⇒ Object
Creates a connection to the target database and returns the connection object or self
121 122 123 |
# File 'lib/dwh/adapters.rb', line 121 def connection raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'" end |
#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...
Execute sql on the target database.
167 168 169 |
# File 'lib/dwh/adapters.rb', line 167 def execute(sql, format: :array, retries: 0) raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'" end |
#execute_stream(sql, io, stats: nil, retries: 0) ⇒ IO
Execute sql and stream responses back. Data is writtent out in CSV format to the provided IO object.
180 181 182 |
# File 'lib/dwh/adapters.rb', line 180 def execute_stream(sql, io, stats: nil, retries: 0) raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'" end |
#extra_connection_params ⇒ Hash
If any extra connection params were passed in the config object, this will return it.
310 311 312 |
# File 'lib/dwh/adapters.rb', line 310 def extra_connection_params config[:extra_connection_params] || {} end |
#extra_query_params ⇒ Hash
If the adapter supports it, will pass on extra query params from the config to the executor.
318 319 320 |
# File 'lib/dwh/adapters.rb', line 318 def extra_query_params config[:extra_query_params] || {} end |
#metadata(table, **qualifiers) ⇒ DWH::Table
Get the schema structure of a given a given table_name. Pass in optional catalog and schema info.
Example: metadata("public.big_table") metadata("big_table") metadata("big_table",schema: "public")
253 254 255 |
# File 'lib/dwh/adapters.rb', line 253 def (table, **qualifiers) raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'" end |
#reset_settings ⇒ Hash
This returns settings back to its original state prior to running alter_settings.
115 116 117 |
# File 'lib/dwh/adapters.rb', line 115 def reset_settings @settings = @original_settings if @original_settings end |
#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table
Returns basic stats of a given table. Will typically include row_count, date_start, and date_end.
211 212 213 |
# File 'lib/dwh/adapters.rb', line 211 def stats(table, date_column: nil, **qualifiers) raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'" end |
#stream(sql) {|chunk| ... } ⇒ Object
Executes the given sql and yields the streamed results to the given block.
191 192 193 |
# File 'lib/dwh/adapters.rb', line 191 def stream(sql, &block) raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'" end |
#table?(table, **qualifiers) ⇒ Boolean
Check if table exists in remote db.
235 236 237 |
# File 'lib/dwh/adapters.rb', line 235 def table?(table, **qualifiers) tables(**qualifiers).include?(table) end |
#tables(**qualifiers) ⇒ Array<String>
Get all tables available in the target db. It will use the default catalog and schema config only specified here.
224 225 226 |
# File 'lib/dwh/adapters.rb', line 224 def tables(**qualifiers) raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'" end |
#test_connection(raise_exception: false) ⇒ Boolean
Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self
130 131 132 |
# File 'lib/dwh/adapters.rb', line 130 def test_connection(raise_exception: false) raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'" end |
#token_expired? ⇒ Boolean
For adapters that uses access tokens via jwt or oauth this will return wether a tokens is expired
325 326 327 |
# File 'lib/dwh/adapters.rb', line 325 def token_expired? @token_expires_at.nil? || Time.now >= @token_expires_at end |
#with_debug(sql, &block) ⇒ Object
Wraps an SQL execution with debug logging. It sill include execution time.
286 287 288 289 290 291 292 293 294 295 296 297 298 |
# File 'lib/dwh/adapters.rb', line 286 def with_debug(sql, &block) starting = Process.clock_gettime(Process::CLOCK_MONOTONIC) logger.debug("=== SQL === \n#{sql}") result = block.call ending = Process.clock_gettime(Process::CLOCK_MONOTONIC) elapsed = ending - starting logger.debug("=== FINISHED SQL (#{elapsed.round(1)} secs) ===") result end |
#with_retry(max_attempts = 2, &block) ⇒ Object
Will call the block with retries given by the max attempts param. If max attempts is 0, it will just return the block.call
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'lib/dwh/adapters.rb', line 262 def with_retry(max_attempts = 2, &block) return block.call if max_attempts.zero? attempts = 0 begin attempts += 1 block.call rescue StandardError => e if attempts < max_attempts logger.warn "Attempt #{attempts} failed with error: #{e.}. Retrying..." retry else logger.error "Failed after #{attempts} attempts with error: #{e.}" raise end end end |