Class: DWH::Adapters::Athena

Inherits:
Adapter
  • Object
show all
Defined in:
lib/dwh/adapters/athena.rb

Overview

AWS Athena adapter. Please ensure the aws-sdk-athena and aws-sdk-s3 gems are available before using this adapter. Generally, adapters should be created using DWH.create. Where a configuration is passed in as options hash or argument list.

Examples:

Basic connection with required options

DWH.create(:athena, {
  region: 'us-east-1',
  database: 'default',
  s3_output_location: 's3://my-athena-results-bucket/queries/',
  access_key_id: 'AKIAIOSFODNN7EXAMPLE',
  secret_access_key: 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
})

Connection with IAM role (recommended)

DWH.create(:athena, {
  region: 'us-east-1',
  database: 'default',
  s3_output_location: 's3://my-athena-results-bucket/queries/'
})

Connection with workgroup

DWH.create(:athena, {
  region: 'us-east-1',
  database: 'default',
  s3_output_location: 's3://my-athena-results-bucket/queries/',
  workgroup: 'my-workgroup'
})

Constant Summary

Constants included from Settings

Settings::BASE_SETTINGS_FILE

Constants included from Functions::Dates

Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS

Instance Attribute Summary

Attributes inherited from Adapter

#config, #settings

Attributes included from Settings

#adapter_settings

Instance Method Summary collapse

Methods inherited from Adapter

#adapter_name, #alter_settings, #close, config, configuration, #connect!, #connect?, #extra_connection_params, #extra_query_params, #initialize, #reset_settings, #table?, #token_expired?, #with_debug, #with_retry

Methods included from Settings

#adapter_name, #load_settings, #settings_file, #settings_file_path, #using_base_settings?

Methods included from Logger

#logger, logger

Methods included from Behaviors

#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type

Methods included from Functions

#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case

Methods included from Functions::Arrays

#array_exclude_list, #array_in_list, #array_unnest_join

Methods included from Functions::Nulls

#if_null, #null_if, #null_if_zero

Methods included from Functions::ExtractDatePart

#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month

Methods included from Functions::Dates

#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_add, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_lit, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #timestamp_lit, #timestamp_literal, #truncate_date, #week_start_day, #week_starts_on_sunday?

Methods included from Capabilities

#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?

Constructor Details

This class inherits a constructor from DWH::Adapters::Adapter

Instance Method Details

#connectionObject

Creates a connection to the target database and returns the connection object or self



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/dwh/adapters/athena.rb', line 44

def connection
  return @connection if @connection

  aws_config = {
    region: config[:region],
    workgroup: config[:workgroup]
  }.compact

  # Add credentials if provided, otherwise rely on IAM role or environment
  if config[:access_key_id] && config[:secret_access_key]
    aws_config[:credentials] = Aws::Credentials.new(
      config[:access_key_id],
      config[:secret_access_key]
    )
  end

  # Merge any extra connection params
  aws_config.merge!(extra_connection_params)

  @connection = Aws::Athena::Client.new(aws_config)
  @s3_output_location = Aws::S3::Client.new(aws_config)

  @connection
rescue StandardError => e
  raise ConfigError, "Failed to connect to Athena: #{e.message}"
end

#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...

Execute sql on the target database.

Parameters:

  • sql (String)

    actual sql

  • format (Symbol, String) (defaults to: :array)

    return format type

    • array returns array of array
    • object returns array of Hashes
    • csv returns as csv
    • native returns the native result from any clients used
      • For example: Postgres using pg client will return PG::Result
      • Http clients will returns the HTTP response object
  • retries (Integer) (defaults to: 0)

    number of retries in case of failure. Default is 0

Returns:

  • (Array<Array>, Hash, CSV, Native)

Raises:



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/dwh/adapters/athena.rb', line 155

def execute(sql, format: :array, retries: 0)
  begin
    result_data = with_debug(sql) { with_retry(retries) { execute_query(sql) } }
  rescue ExecutionError
    raise
  rescue StandardError => e
    raise ExecutionError, "Athena query failed: #{e.message}"
  end

  format = format.downcase if format.is_a?(String)
  case format.to_sym
  when :array
    result_data[:rows]
  when :object
    headers = result_data[:headers]
    result_data[:rows].map { |row| Hash[headers.zip(row)] }
  when :csv
    rows_to_csv(result_data[:headers], result_data[:rows])
  when :native
    result_data
  else
    raise UnsupportedCapability, "Unsupported format: #{format} for Athena adapter"
  end
end

#execute_stream(sql, io, stats: nil, retries: 0) ⇒ IO

Execute sql and stream responses back. Data is writtent out in CSV format to the provided IO object.

Parameters:

  • sql (String)

    actual sql

  • io (IO)

    IO object to write records to

  • stats (StreamingStats) (defaults to: nil)

    collect stats and preview data this is optional

  • retries (Integer) (defaults to: 0)

    number of retries in case of failure

Returns:

  • (IO)

Raises:



181
182
183
184
185
186
187
188
189
# File 'lib/dwh/adapters/athena.rb', line 181

def execute_stream(sql, io, stats: nil, retries: 0)
  with_debug(sql) do
    with_retry(retries) do
      execute_query(sql, io: io, stats: stats)
    end
  end
rescue StandardError => e
  raise ExecutionError, "Athena streaming query failed: #{e.message}"
end

#metadata(table, **qualifiers) ⇒ DWH::Table

Get the schema structure of a given a given table_name. Pass in optional catalog and schema info.

Example: metadata("public.big_table") metadata("big_table") metadata("big_table",schema: "public")

Parameters:

  • table (String)
    • table name
  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn't support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn't support

Returns:



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/dwh/adapters/athena.rb', line 121

def (table, **qualifiers)
  schema = qualifiers[:database] || qualifiers[:schema] || config[:database]
  catalog =  qualifiers[:catalog] || config[:catalog]
  db_table = Table.new table, schema: schema, catalog: catalog

  sql = 'SELECT * FROM information_schema.columns'
  wheres = ["WHERE table_name = '#{db_table.physical_name}'"]

  wheres << "table_schema = '#{db_table.schema}'" if db_table.schema
  wheres << "table_catalog = '#{db_table.catalog}'" if db_table.catalog

  cols = execute("#{sql} \n #{wheres.join(' AND ')}", format: :object)
  cols.each do |col|
    # Athena DESCRIBE returns different column names than standard information_schema
    column_name = col['col_name'] || col['column_name']
    data_type = col['data_type']

    # Parse Athena data types (e.g., "varchar(255)", "decimal(10,2)")
    precision, scale = parse_data_type_precision(data_type)
    max_char_length = parse_char_length(data_type)

    db_table << Column.new(
      name: column_name,
      data_type: data_type,
      precision: precision,
      scale: scale,
      max_char_length: max_char_length
    )
  end

  db_table
end

#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table

Returns basic stats of a given table. Will typically include row_count, date_start, and date_end.

Examples:

stats("public.big_table", date_column: "fact_date")
stats("big_table")
stats("big_table",schema: "public")

Parameters:

  • table (String)

    table name

  • date_column (String) (defaults to: nil)

    optional date column to use to find range

  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn't support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn't support

Returns:

Raises:



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/dwh/adapters/athena.rb', line 97

def stats(table, date_column: nil, **qualifiers)
  database_name = qualifiers[:database] || config[:database]
  full_table_name = "#{database_name}.#{table}"

  sql_parts = ['SELECT COUNT(*) as row_count']

  if date_column
    sql_parts << ", MIN(#{date_column}) as date_start"
    sql_parts << ", MAX(#{date_column}) as date_end"
  end

  sql = "#{sql_parts.join} FROM #{full_table_name}"

  result = execute(sql, format: :object)
  first_row = result.first || {}

  TableStats.new(
    row_count: first_row['row_count'],
    date_start: first_row['date_start'],
    date_end: first_row['date_end']
  )
end

#stream(sql) {|chunk| ... } ⇒ Object

Executes the given sql and yields the streamed results to the given block.

Parameters:

  • sql (String)

    actual sql

Yields:

  • (chunk)

    Yields a streamed chunk as it streams in. The chunk type might vary depending on the target db and settings

Raises:



192
193
194
195
196
197
198
199
200
# File 'lib/dwh/adapters/athena.rb', line 192

def stream(sql, &block)
  with_debug(sql) do
    result_data = execute_query(sql)

    result_data[:rows].each do |row|
      block.call(row)
    end
  end
end

#tables(**qualifiers) ⇒ Array<String>

Get all tables available in the target db. It will use the default catalog and schema config only specified here.

Parameters:

  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn't support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn't support

Returns:

  • (Array<String>)


83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/dwh/adapters/athena.rb', line 83

def tables(**qualifiers)
  schema = qualifiers[:database] || qualifiers[:schema] || config[:database]
  catalog = qualifiers[:catalog] || config[:catalog]

  sql = 'SELECT table_name FROM information_schema.tables'
  wheres = ['WHERE 1=1']
  wheres << "table_catalog = '#{catalog}'"
  wheres << "table_schema = '#{schema}'"

  result = execute("#{sql} #{wheres.join(' AND ')}", format: :array)
  result.flatten
end

#test_connection(raise_exception: false) ⇒ Boolean

Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self

Returns:

Raises:



72
73
74
75
76
77
78
79
80
# File 'lib/dwh/adapters/athena.rb', line 72

def test_connection(raise_exception: false)
  # Test connection by listing workgroups
  connection.list_work_groups(max_results: 1)
  true
rescue StandardError => e
  raise ConnectionError, "Athena connection test failed: #{e.message}" if raise_exception

  false
end

#valid_config?Boolean

Returns:



202
203
204
205
206
207
# File 'lib/dwh/adapters/athena.rb', line 202

def valid_config?
  super
  require 'aws-sdk-athena'
rescue LoadError
  raise ConfigError, "Required 'aws-sdk-athena' and 'aws-sdk-s3' gems missing. Please add them to your Gemfile."
end