Class: DWH::Adapters::Snowflake
- Includes:
- OpenAuthorizable
- Defined in:
- lib/dwh/adapters/snowflake.rb
Overview
Snowflake adapter for executing SQL queries against Snowflake databases.
Supports two authentication modes:
- Personal Access Token (pat)
- Key Pair Authentication (kp)
- OAuth 2.0 (oauth)
Constant Summary collapse
- AUTH_TOKEN_TYPES =
Constants
{ pat: 'PROGRAMMATIC_ACCESS_TOKEN', kp: 'KEYPAIR_JWT', oauth: 'OAUTH' }.freeze
- API_ENDPOINTS =
{ statements: '/api/v2/statements' }.freeze
- DEFAULT_PARAMETERS =
{ DATE_OUTPUT_FORMAT: 'YYYY-MM-DD', TIMESTAMP_OUTPUT_FORMAT: 'YYYY-MM-DD HH24:MI:SS', TIMESTAMP_TZ_OUTPUT_FORMAT: 'YYYY-MM-DD HH24:MI:SS TZH', TIMESTAMP_NTZ_OUTPUT_FORMAT: 'YYYY-MM-DD HH24:MI:SS', TIMESTAMP_LTZ_OUTPUT_FORMAT: 'YYYY-MM-DD HH24:MI:SS TZH', TIME_OUTPUT_FORMAT: 'HH24:MI:SS' }.freeze
- DEFAULT_POLL_INTERVAL =
0.25- MAX_POLL_INTERVAL =
30- TOKEN_VALIDITY_SECONDS =
3600
Constants included from Settings
Constants included from Functions::Dates
Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS
Instance Attribute Summary
Attributes inherited from Adapter
Attributes included from Settings
Instance Method Summary collapse
-
#connection ⇒ Object
Creates a connection to the target database and returns the connection object or self.
-
#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...
Execute sql on the target database.
-
#execute_stream(sql, io, stats: nil, retries: 0) ⇒ Array<Array>, ...
Execute sql on the target database.
-
#initialize(config) ⇒ Snowflake
constructor
A new instance of Snowflake.
-
#metadata(table, **qualifiers) ⇒ Array<String>
Get all tables available in the target db.
-
#stats(table, date_column: nil) ⇒ DWH::Table
Returns basic stats of a given table.
-
#stream(sql) {|chunk| ... } ⇒ Object
Execute SQL query and yield streamed results.
-
#tables(**qualifiers) ⇒ Array<String>
Get all tables available in the target db.
-
#test_connection(raise_exception: false) ⇒ Boolean
Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self.
Methods included from OpenAuthorizable
#apply_oauth_tokens, #authorization_url, #generate_oauth_tokens, included, #oauth_access_token, #oauth_authenticated?, #oauth_settings, #oauth_token_info, #oauth_tokenization_url, #refresh_access_token, #validate_oauth_config
Methods inherited from Adapter
#adapter_name, #alter_settings, #close, config, configuration, #connect!, #connect?, #extra_connection_params, #extra_query_params, #reset_settings, #table?, #token_expired?, #with_debug, #with_retry
Methods included from Settings
#adapter_name, #load_settings, #settings_file, #settings_file_path, #using_base_settings?
Methods included from Logger
Methods included from Behaviors
#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type
Methods included from Functions
#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case
Methods included from Functions::Arrays
#array_exclude_list, #array_in_list, #array_unnest_join
Methods included from Functions::Nulls
#if_null, #null_if, #null_if_zero
Methods included from Functions::ExtractDatePart
#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month
Methods included from Functions::Dates
#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_add, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_lit, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #timestamp_lit, #timestamp_literal, #truncate_date, #week_start_day, #week_starts_on_sunday?
Methods included from Capabilities
#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?
Constructor Details
#initialize(config) ⇒ Snowflake
Returns a new instance of Snowflake.
130 131 132 133 |
# File 'lib/dwh/adapters/snowflake.rb', line 130 def initialize(config) super validate_auth_config end |
Instance Method Details
#connection ⇒ Object
Creates a connection to the target database and returns the connection object or self
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/dwh/adapters/snowflake.rb', line 136 def connection return @connection if @connection && !token_expired? reset_connection if token_expired? @token_expires_at ||= Time.now + TOKEN_VALIDITY_SECONDS @connection = Faraday.new( url: "https://#{config[:account_identifier]}.snowflakecomputing.com", headers: { 'Content-Type' => 'application/json', 'Authorization' => "Bearer #{auth_token}", 'X-Snowflake-Authorization-Token-Type' => auth_token_type, 'User-Agent' => config[:client_name] }, request: { timeout: config[:query_timeout] }.merge(extra_connection_params) ) end |
#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...
Execute sql on the target database.
168 169 170 171 172 173 174 175 176 177 |
# File 'lib/dwh/adapters/snowflake.rb', line 168 def execute(sql, format: :array, retries: 0) result = with_retry(retries + 1) do with_debug(sql) do response = submit_query(sql) fetch_data(handle_query_response(response)) end end format_result(result, format) end |
#execute_stream(sql, io, stats: nil, retries: 0) ⇒ Array<Array>, ...
Execute sql on the target database.
180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/dwh/adapters/snowflake.rb', line 180 def execute_stream(sql, io, stats: nil, retries: 0) with_retry(retries) do with_debug(sql) do response = submit_query(sql) fetch_data(handle_query_response(response), io: io, stats: stats) end end io.rewind io end |
#metadata(table, **qualifiers) ⇒ Array<String>
Get all tables available in the target db. It will use the default catalog and schema config only specified here.
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
# File 'lib/dwh/adapters/snowflake.rb', line 226 def (table, **qualifiers) catalog, schema = qualifiers.values_at(:catalog, :schema) db_table = Table.new(table, schema: schema, catalog: catalog) db = db_table.catalog || config[:database] sql = <<~SQL SELECT column_name, data_type, numeric_precision, numeric_scale, character_maximum_length FROM #{db}.information_schema.columns SQL conditions = ["table_name = '#{db_table.physical_name.upcase}'"] conditions << "table_schema = '#{db_table.schema.upcase}'" if db_table.schema columns = execute("#{sql} WHERE #{conditions.join(' AND ')}") columns.each do |col| db_table << Column.new( name: col[0]&.downcase, data_type: col[1]&.downcase, precision: col[2], scale: col[3], max_char_length: col[4] ) end db_table end |
#stats(table, date_column: nil) ⇒ DWH::Table
Returns basic stats of a given table. Will typically include row_count, date_start, and date_end.
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/dwh/adapters/snowflake.rb', line 254 def stats(table, date_column: nil) date_fields = if date_column ", MIN(#{date_column}) AS date_start, MAX(#{date_column}) AS date_end" else ', NULL AS date_start, NULL AS date_end' end data = execute("SELECT COUNT(*) AS row_count#{date_fields} FROM #{table}") cols = data.first TableStats.new( row_count: cols[0], date_start: cols[1], date_end: cols[2] ) end |
#stream(sql) {|chunk| ... } ⇒ Object
Execute SQL query and yield streamed results
195 196 197 198 199 200 |
# File 'lib/dwh/adapters/snowflake.rb', line 195 def stream(sql, &block) with_debug(sql) do response = submit_query(sql) fetch_data(handle_query_response(response), proc: block) end end |
#tables(**qualifiers) ⇒ Array<String>
Get all tables available in the target db. It will use the default catalog and schema config only specified here.
For metadata queries table_catalog and database are the same in the Snowflake information_schema.
However, we need to prefix the information_schema table with the db name to correctly constrain to the target db.
210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/dwh/adapters/snowflake.rb', line 210 def tables(**qualifiers) catalog, schema = qualifiers.values_at(:catalog, :schema) db = catalog || config[:database] sql = "SELECT table_name FROM #{db}.information_schema.tables" conditions = [] conditions << "table_schema = '#{schema.upcase}'" if schema sql += " WHERE #{conditions.join(' AND ')}" if conditions.any? result = execute(sql) result.flatten end |
#test_connection(raise_exception: false) ⇒ Boolean
Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self
157 158 159 160 161 162 163 164 165 |
# File 'lib/dwh/adapters/snowflake.rb', line 157 def test_connection(raise_exception: false) execute('SELECT 1') true rescue StandardError => e raise ConnectionError, "Failed to connect to Snowflake: #{e.}" if raise_exception logger.error "Connection test failed: #{e.}" false end |