Class: DWH::Adapters::Snowflake

Inherits:
Adapter
  • Object
show all
Includes:
OpenAuthorizable
Defined in:
lib/dwh/adapters/snowflake.rb

Overview

Snowflake adapter for executing SQL queries against Snowflake databases.

Supports two authentication modes:

  • Personal Access Token (pat)
  • Key Pair Authentication (kp)
  • OAuth 2.0 (oauth)

Examples:

Basic connection with Personal Access Token

DWH.create(:snowflake, {
  auth_mode: 'pat',
  account_identifier: 'myorg-myaccount',
  personal_access_token: 'your-token-here',
  warehouse: 'COMPUTE_WH',
  database: 'ANALYTICS',
  schema: 'PUBLIC'
})

Connection with Key Pair Authentication

DWH.create(:snowflake, {
  auth_mode: 'kp',
  account_identifier: 'myorg-myaccount.us-east-1',
  username: 'john_doe',
  private_key: '/path/to/private_key.pem',
  warehouse: 'COMPUTE_WH',
  database: 'ANALYTICS'
})

Connecting with OAuth

DWH.create(:snowflake, {
  auth_mode: 'oauth',
  account_identifier: 'myorg-myaccount.us-east-1',
  oauth_client_id: '<YOUR_CLIENT_ID>',
  oauth_client_secret: '<YOUR_CLIENT_SECRET>',
  oauth_redirect_url: 'https://localhost:3030/some/path',
  database: 'ANALYTICS'
})

# This sill only work if you setup an OAuth security integration
# and grant it to the correct users.

# Use this url to get auth code
adapter.authorization_url

# Pass the code to generate oauth tokens
adapter.generate_oauth_tokens(authorization_code)

# Apply previously created tokens for new connections
adapter.apply_oauth_tokens(access_token: token, refresh_token: token, expires_at: Time.now)

Constant Summary collapse

AUTH_TOKEN_TYPES =

Constants

{
  pat: 'PROGRAMMATIC_ACCESS_TOKEN',
  kp: 'KEYPAIR_JWT',
  oauth: 'OAUTH'
}.freeze
API_ENDPOINTS =
{
  statements: '/api/v2/statements'
}.freeze
DEFAULT_PARAMETERS =
{
  DATE_OUTPUT_FORMAT: 'YYYY-MM-DD',
  TIMESTAMP_OUTPUT_FORMAT: 'YYYY-MM-DD HH24:MI:SS',
  TIMESTAMP_TZ_OUTPUT_FORMAT: 'YYYY-MM-DD HH24:MI:SS TZH',
  TIMESTAMP_NTZ_OUTPUT_FORMAT: 'YYYY-MM-DD HH24:MI:SS',
  TIMESTAMP_LTZ_OUTPUT_FORMAT: 'YYYY-MM-DD HH24:MI:SS TZH',
  TIME_OUTPUT_FORMAT: 'HH24:MI:SS'
}.freeze
DEFAULT_POLL_INTERVAL =
0.25
MAX_POLL_INTERVAL =
30
TOKEN_VALIDITY_SECONDS =
3600

Constants included from Settings

Settings::BASE_SETTINGS_FILE

Constants included from Functions::Dates

Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS

Instance Attribute Summary

Attributes inherited from Adapter

#config, #settings

Attributes included from Settings

#adapter_settings

Instance Method Summary collapse

Methods included from OpenAuthorizable

#apply_oauth_tokens, #authorization_url, #generate_oauth_tokens, included, #oauth_access_token, #oauth_authenticated?, #oauth_settings, #oauth_token_info, #oauth_tokenization_url, #refresh_access_token, #validate_oauth_config

Methods inherited from Adapter

#adapter_name, #alter_settings, #close, config, configuration, #connect!, #connect?, #extra_connection_params, #extra_query_params, #reset_settings, #table?, #token_expired?, #with_debug, #with_retry

Methods included from Settings

#adapter_name, #load_settings, #settings_file, #settings_file_path, #using_base_settings?

Methods included from Logger

#logger, logger

Methods included from Behaviors

#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type

Methods included from Functions

#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case

Methods included from Functions::Arrays

#array_exclude_list, #array_in_list, #array_unnest_join

Methods included from Functions::Nulls

#if_null, #null_if, #null_if_zero

Methods included from Functions::ExtractDatePart

#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month

Methods included from Functions::Dates

#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_add, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_lit, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #timestamp_lit, #timestamp_literal, #truncate_date, #week_start_day, #week_starts_on_sunday?

Methods included from Capabilities

#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?

Constructor Details

#initialize(config) ⇒ Snowflake

Returns a new instance of Snowflake.



130
131
132
133
# File 'lib/dwh/adapters/snowflake.rb', line 130

def initialize(config)
  super
  validate_auth_config
end

Instance Method Details

#connectionObject

Creates a connection to the target database and returns the connection object or self



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/dwh/adapters/snowflake.rb', line 136

def connection
  return @connection if @connection && !token_expired?

  reset_connection if token_expired?
  @token_expires_at ||= Time.now + TOKEN_VALIDITY_SECONDS

  @connection = Faraday.new(
    url: "https://#{config[:account_identifier]}.snowflakecomputing.com",
    headers: {
      'Content-Type' => 'application/json',
      'Authorization' => "Bearer #{auth_token}",
      'X-Snowflake-Authorization-Token-Type' => auth_token_type,
      'User-Agent' => config[:client_name]
    },
    request: {
      timeout: config[:query_timeout]
    }.merge(extra_connection_params)
  )
end

#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...

Execute sql on the target database.

Parameters:

  • sql (String)

    actual sql

  • format (Symbol, String) (defaults to: :array)

    return format type

    • array returns array of array
    • object returns array of Hashes
    • csv returns as csv
    • native returns the native result from any clients used
      • For example: Postgres using pg client will return PG::Result
      • Http clients will returns the HTTP response object
  • retries (Integer) (defaults to: 0)

    number of retries in case of failure. Default is 0

Returns:

  • (Array<Array>, Hash, CSV, Native)

Raises:



168
169
170
171
172
173
174
175
176
177
# File 'lib/dwh/adapters/snowflake.rb', line 168

def execute(sql, format: :array, retries: 0)
  result = with_retry(retries + 1) do
    with_debug(sql) do
      response = submit_query(sql)
      fetch_data(handle_query_response(response))
    end
  end

  format_result(result, format)
end

#execute_stream(sql, io, stats: nil, retries: 0) ⇒ Array<Array>, ...

Execute sql on the target database.

Parameters:

  • sql (String)

    actual sql

  • format (Symbol, String)

    return format type

    • array returns array of array
    • object returns array of Hashes
    • csv returns as csv
    • native returns the native result from any clients used
      • For example: Postgres using pg client will return PG::Result
      • Http clients will returns the HTTP response object
  • retries (Integer) (defaults to: 0)

    number of retries in case of failure. Default is 0

Returns:

  • (Array<Array>, Hash, CSV, Native)

Raises:



180
181
182
183
184
185
186
187
188
189
190
# File 'lib/dwh/adapters/snowflake.rb', line 180

def execute_stream(sql, io, stats: nil, retries: 0)
  with_retry(retries) do
    with_debug(sql) do
      response = submit_query(sql)
      fetch_data(handle_query_response(response), io: io, stats: stats)
    end
  end

  io.rewind
  io
end

#metadata(table, **qualifiers) ⇒ Array<String>

Get all tables available in the target db. It will use the default catalog and schema config only specified here.

Parameters:

  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn't support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn't support

Returns:

  • (Array<String>)


226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/dwh/adapters/snowflake.rb', line 226

def (table, **qualifiers)
  catalog, schema = qualifiers.values_at(:catalog, :schema)
  db_table = Table.new(table, schema: schema, catalog: catalog)
  db = db_table.catalog || config[:database]
  sql = <<~SQL
    SELECT column_name, data_type, numeric_precision, numeric_scale, character_maximum_length
    FROM #{db}.information_schema.columns
  SQL

  conditions = ["table_name = '#{db_table.physical_name.upcase}'"]
  conditions << "table_schema = '#{db_table.schema.upcase}'" if db_table.schema

  columns = execute("#{sql} WHERE #{conditions.join(' AND ')}")

  columns.each do |col|
    db_table << Column.new(
      name: col[0]&.downcase,
      data_type: col[1]&.downcase,
      precision: col[2],
      scale: col[3],
      max_char_length: col[4]
    )
  end

  db_table
end

#stats(table, date_column: nil) ⇒ DWH::Table

Returns basic stats of a given table. Will typically include row_count, date_start, and date_end.

Examples:

stats("public.big_table", date_column: "fact_date")
stats("big_table")
stats("big_table",schema: "public")

Parameters:

  • table (String)

    table name

  • date_column (String) (defaults to: nil)

    optional date column to use to find range

  • qualifiers (Hash)

    a customizable set of options

Returns:

Raises:



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/dwh/adapters/snowflake.rb', line 254

def stats(table, date_column: nil)
  date_fields = if date_column
                  ", MIN(#{date_column}) AS date_start, MAX(#{date_column}) AS date_end"
                else
                  ', NULL AS date_start, NULL AS date_end'
                end

  data = execute("SELECT COUNT(*) AS row_count#{date_fields} FROM #{table}")
  cols = data.first

  TableStats.new(
    row_count: cols[0],
    date_start: cols[1],
    date_end: cols[2]
  )
end

#stream(sql) {|chunk| ... } ⇒ Object

Execute SQL query and yield streamed results

Parameters:

  • sql (String)

    SQL query to execute

Yields:

  • (chunk)

    yields each chunk of data as it's processed



195
196
197
198
199
200
# File 'lib/dwh/adapters/snowflake.rb', line 195

def stream(sql, &block)
  with_debug(sql) do
    response = submit_query(sql)
    fetch_data(handle_query_response(response), proc: block)
  end
end

#tables(**qualifiers) ⇒ Array<String>

Get all tables available in the target db. It will use the default catalog and schema config only specified here.

For metadata queries table_catalog and database are the same in the Snowflake information_schema.

However, we need to prefix the information_schema table with the db name to correctly constrain to the target db.

Parameters:

  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn't support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn't support

Returns:

  • (Array<String>)
  • (Array<String>)

    list of table names



210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/dwh/adapters/snowflake.rb', line 210

def tables(**qualifiers)
  catalog, schema = qualifiers.values_at(:catalog, :schema)

  db = catalog || config[:database]
  sql = "SELECT table_name FROM #{db}.information_schema.tables"
  conditions = []

  conditions << "table_schema = '#{schema.upcase}'" if schema

  sql += " WHERE #{conditions.join(' AND ')}" if conditions.any?

  result = execute(sql)
  result.flatten
end

#test_connection(raise_exception: false) ⇒ Boolean

Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self

Returns:

Raises:



157
158
159
160
161
162
163
164
165
# File 'lib/dwh/adapters/snowflake.rb', line 157

def test_connection(raise_exception: false)
  execute('SELECT 1')
  true
rescue StandardError => e
  raise ConnectionError, "Failed to connect to Snowflake: #{e.message}" if raise_exception

  logger.error "Connection test failed: #{e.message}"
  false
end