Class: DWH::Adapters::Athena
- Defined in:
- lib/dwh/adapters/athena.rb
Overview
AWS Athena adapter. Please ensure the aws-sdk-athena and aws-sdk-s3 gems are available before using this adapter. Generally, adapters should be created using DWH.create. Where a configuration is passed in as options hash or argument list.
Constant Summary
Constants included from Settings
Constants included from Functions::Dates
Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS
Instance Attribute Summary
Attributes inherited from Adapter
Attributes included from Settings
Instance Method Summary collapse
-
#connection ⇒ Object
Creates a connection to the target database and returns the connection object or self.
-
#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...
Execute sql on the target database.
-
#execute_stream(sql, io, stats: nil, retries: 0) ⇒ IO
Execute sql and stream responses back.
-
#metadata(table, **qualifiers) ⇒ DWH::Table
Get the schema structure of a given a given table_name.
-
#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table
Returns basic stats of a given table.
-
#stream(sql) {|chunk| ... } ⇒ Object
Executes the given sql and yields the streamed results to the given block.
-
#tables(**qualifiers) ⇒ Array<String>
Get all tables available in the target db.
-
#test_connection(raise_exception: false) ⇒ Boolean
Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self.
- #valid_config? ⇒ Boolean
Methods inherited from Adapter
#adapter_name, #alter_settings, #close, config, configuration, #connect!, #connect?, #extra_connection_params, #extra_query_params, #initialize, #reset_settings, #table?, #token_expired?, #with_debug, #with_retry
Methods included from Settings
#adapter_name, #load_settings, #settings_file, #settings_file_path, #using_base_settings?
Methods included from Logger
Methods included from Behaviors
#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type
Methods included from Functions
#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case
Methods included from Functions::Arrays
#array_exclude_list, #array_in_list, #array_unnest_join
Methods included from Functions::Nulls
#if_null, #null_if, #null_if_zero
Methods included from Functions::ExtractDatePart
#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month
Methods included from Functions::Dates
#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_add, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_lit, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #timestamp_lit, #timestamp_literal, #truncate_date, #week_start_day, #week_starts_on_sunday?
Methods included from Capabilities
#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?
Constructor Details
This class inherits a constructor from DWH::Adapters::Adapter
Instance Method Details
#connection ⇒ Object
Creates a connection to the target database and returns the connection object or self
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/dwh/adapters/athena.rb', line 44 def connection return @connection if @connection aws_config = { region: config[:region], workgroup: config[:workgroup] }.compact # Add credentials if provided, otherwise rely on IAM role or environment if config[:access_key_id] && config[:secret_access_key] aws_config[:credentials] = Aws::Credentials.new( config[:access_key_id], config[:secret_access_key] ) end # Merge any extra connection params aws_config.merge!(extra_connection_params) @connection = Aws::Athena::Client.new(aws_config) @s3_output_location = Aws::S3::Client.new(aws_config) @connection rescue StandardError => e raise ConfigError, "Failed to connect to Athena: #{e.}" end |
#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...
Execute sql on the target database.
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/dwh/adapters/athena.rb', line 155 def execute(sql, format: :array, retries: 0) begin result_data = with_debug(sql) { with_retry(retries) { execute_query(sql) } } rescue ExecutionError raise rescue StandardError => e raise ExecutionError, "Athena query failed: #{e.}" end format = format.downcase if format.is_a?(String) case format.to_sym when :array result_data[:rows] when :object headers = result_data[:headers] result_data[:rows].map { |row| Hash[headers.zip(row)] } when :csv rows_to_csv(result_data[:headers], result_data[:rows]) when :native result_data else raise UnsupportedCapability, "Unsupported format: #{format} for Athena adapter" end end |
#execute_stream(sql, io, stats: nil, retries: 0) ⇒ IO
Execute sql and stream responses back. Data is writtent out in CSV format to the provided IO object.
181 182 183 184 185 186 187 188 189 |
# File 'lib/dwh/adapters/athena.rb', line 181 def execute_stream(sql, io, stats: nil, retries: 0) with_debug(sql) do with_retry(retries) do execute_query(sql, io: io, stats: stats) end end rescue StandardError => e raise ExecutionError, "Athena streaming query failed: #{e.}" end |
#metadata(table, **qualifiers) ⇒ DWH::Table
Get the schema structure of a given a given table_name. Pass in optional catalog and schema info.
Example: metadata("public.big_table") metadata("big_table") metadata("big_table",schema: "public")
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/dwh/adapters/athena.rb', line 121 def (table, **qualifiers) schema = qualifiers[:database] || qualifiers[:schema] || config[:database] catalog = qualifiers[:catalog] || config[:catalog] db_table = Table.new table, schema: schema, catalog: catalog sql = 'SELECT * FROM information_schema.columns' wheres = ["WHERE table_name = '#{db_table.physical_name}'"] wheres << "table_schema = '#{db_table.schema}'" if db_table.schema wheres << "table_catalog = '#{db_table.catalog}'" if db_table.catalog cols = execute("#{sql} \n #{wheres.join(' AND ')}", format: :object) cols.each do |col| # Athena DESCRIBE returns different column names than standard information_schema column_name = col['col_name'] || col['column_name'] data_type = col['data_type'] # Parse Athena data types (e.g., "varchar(255)", "decimal(10,2)") precision, scale = parse_data_type_precision(data_type) max_char_length = parse_char_length(data_type) db_table << Column.new( name: column_name, data_type: data_type, precision: precision, scale: scale, max_char_length: max_char_length ) end db_table end |
#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table
Returns basic stats of a given table. Will typically include row_count, date_start, and date_end.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/dwh/adapters/athena.rb', line 97 def stats(table, date_column: nil, **qualifiers) database_name = qualifiers[:database] || config[:database] full_table_name = "#{database_name}.#{table}" sql_parts = ['SELECT COUNT(*) as row_count'] if date_column sql_parts << ", MIN(#{date_column}) as date_start" sql_parts << ", MAX(#{date_column}) as date_end" end sql = "#{sql_parts.join} FROM #{full_table_name}" result = execute(sql, format: :object) first_row = result.first || {} TableStats.new( row_count: first_row['row_count'], date_start: first_row['date_start'], date_end: first_row['date_end'] ) end |
#stream(sql) {|chunk| ... } ⇒ Object
Executes the given sql and yields the streamed results to the given block.
192 193 194 195 196 197 198 199 200 |
# File 'lib/dwh/adapters/athena.rb', line 192 def stream(sql, &block) with_debug(sql) do result_data = execute_query(sql) result_data[:rows].each do |row| block.call(row) end end end |
#tables(**qualifiers) ⇒ Array<String>
Get all tables available in the target db. It will use the default catalog and schema config only specified here.
83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/dwh/adapters/athena.rb', line 83 def tables(**qualifiers) schema = qualifiers[:database] || qualifiers[:schema] || config[:database] catalog = qualifiers[:catalog] || config[:catalog] sql = 'SELECT table_name FROM information_schema.tables' wheres = ['WHERE 1=1'] wheres << "table_catalog = '#{catalog}'" wheres << "table_schema = '#{schema}'" result = execute("#{sql} #{wheres.join(' AND ')}", format: :array) result.flatten end |
#test_connection(raise_exception: false) ⇒ Boolean
Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self
72 73 74 75 76 77 78 79 80 |
# File 'lib/dwh/adapters/athena.rb', line 72 def test_connection(raise_exception: false) # Test connection by listing workgroups connection.list_work_groups(max_results: 1) true rescue StandardError => e raise ConnectionError, "Athena connection test failed: #{e.}" if raise_exception false end |
#valid_config? ⇒ Boolean
202 203 204 205 206 207 |
# File 'lib/dwh/adapters/athena.rb', line 202 def valid_config? super require 'aws-sdk-athena' rescue LoadError raise ConfigError, "Required 'aws-sdk-athena' and 'aws-sdk-s3' gems missing. Please add them to your Gemfile." end |