Class: DWH::Adapters::Trino
- Defined in:
- lib/dwh/adapters/trino.rb
Overview
Trino adapter. This should work for Presto as well. This adapter requires the trino-client-ruby gem.
Create adatper instances using DWH.create.
Constant Summary
Constants included from Settings
Constants included from Functions::Dates
Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS
Instance Attribute Summary
Attributes inherited from Adapter
Attributes included from Settings
Instance Method Summary collapse
- #connection ⇒ Object
-
#execute(sql, format: :array, retries: 2) ⇒ Array<Array>, ...
Execute sql on the target database.
-
#execute_stream(sql, io, stats: nil, retries: 1) ⇒ IO
Execute sql and stream responses back.
-
#metadata(table, **qualifiers) ⇒ DWH::Table
Get the schema structure of a given a given table_name.
- #schema? ⇒ Boolean
-
#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table
Returns basic stats of a given table.
-
#stream(sql) {|chunk| ... } ⇒ Object
Executes the given sql and yields the streamed results to the given block.
-
#table?(table, **qualifiers) ⇒ Boolean
Check if table exists in remote db.
-
#tables(**qualifiers) ⇒ Array<String>
Get all tables available in the target db.
- #test_connection(raise_exception: false) ⇒ Object
- #valid_config? ⇒ Boolean
Methods inherited from Adapter
#adapter_name, #alter_settings, #close, config, configuration, #connect!, #connect?, #extra_connection_params, #extra_query_params, #initialize, #reset_settings, #token_expired?, #with_debug, #with_retry
Methods included from Settings
#adapter_name, #load_settings, #settings_file, #settings_file_path, #using_base_settings?
Methods included from Logger
Methods included from Behaviors
#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type
Methods included from Functions
#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case
Methods included from Functions::Arrays
#array_exclude_list, #array_in_list, #array_unnest_join
Methods included from Functions::Nulls
#if_null, #null_if, #null_if_zero
Methods included from Functions::ExtractDatePart
#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month
Methods included from Functions::Dates
#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_add, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_lit, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #timestamp_lit, #timestamp_literal, #truncate_date, #week_start_day, #week_starts_on_sunday?
Methods included from Capabilities
#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?
Constructor Details
This class inherits a constructor from DWH::Adapters::Adapter
Instance Method Details
#connection ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/dwh/adapters/trino.rb', line 32 def connection return @connection if @connection ssl_setup = config[:ssl] ? { verify: false } : config[:ssl] properties = { server: "#{config[:host]}:#{config[:port]}", ssl: ssl_setup, schema: config[:schema], catalog: config[:catalog], user: config[:username], password: config[:password], query_timeout: config[:query_timeout], source: config[:client_name] }.merge(extra_connection_params) @connection = ::Trino::Client.new(properties) rescue StandardError => e raise ConfigError, e. end |
#execute(sql, format: :array, retries: 2) ⇒ Array<Array>, ...
Execute sql on the target database.
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/dwh/adapters/trino.rb', line 139 def execute(sql, format: :array, retries: 2) result = with_debug(sql) do with_retry(retries) do if format == :object connection.run_with_names(sql) else connection.run(sql) end end end case format when :native result when :csv result_to_csv(result) when :array result[1] when :object result else raise UnsupportedCapability, "Unknown format type: #{format}. Should be :native, :array, :object, or :csv" end rescue ::Trino::Client::TrinoQueryError => e raise ExecutionError, e. end |
#execute_stream(sql, io, stats: nil, retries: 1) ⇒ IO
Execute sql and stream responses back. Data is writtent out in CSV format to the provided IO object.
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/dwh/adapters/trino.rb', line 167 def execute_stream(sql, io, stats: nil, retries: 1) with_debug(sql) do with_retry(retries) do connection.query(sql) do |result| io.write(CSV.generate_line(result.columns.map(&:name))) result.each_row do |row| stats << row if stats io << CSV.generate_line(row) end end end end io.rewind io end |
#metadata(table, **qualifiers) ⇒ DWH::Table
Get the schema structure of a given a given table_name. Pass in optional catalog and schema info.
Example: metadata("public.big_table") metadata("big_table") metadata("big_table",schema: "public")
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/dwh/adapters/trino.rb', line 117 def (table, **qualifiers) db_table = Table.new table, **qualifiers sql = "SHOW COLUMNS FROM #{db_table.fully_qualified_table_name}" _, cols = execute(sql, format: :native, retries: 1) cols.each do |col| dt = col[1].start_with?('row(') ? 'struct' : col[1] db_table << Column.new( name: col[0], data_type: dt ) end db_table end |
#schema? ⇒ Boolean
134 135 136 |
# File 'lib/dwh/adapters/trino.rb', line 134 def schema? config.key?(:schema) end |
#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table
Returns basic stats of a given table. Will typically include row_count, date_start, and date_end.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/dwh/adapters/trino.rb', line 97 def stats(table, date_column: nil, **qualifiers) db_table = Table.new(table, **qualifiers) sql = <<-SQL SELECT count(*) ROW_COUNT #{date_column.nil? ? nil : ", min(#{date_column}) DATE_START"} #{date_column.nil? ? nil : ", max(#{date_column}) DATE_END"} FROM #{db_table.fully_qualified_table_name} SQL rows = execute(sql, retries: 1) row = rows[0] TableStats.new( date_start: row[1], date_end: row[2], row_count: row[0] ) end |
#stream(sql) {|chunk| ... } ⇒ Object
Executes the given sql and yields the streamed results to the given block.
185 186 187 188 189 190 191 |
# File 'lib/dwh/adapters/trino.rb', line 185 def stream(sql, &block) with_debug(sql) do connection.query(sql) do |result| result.each_row(&block) end end end |
#table?(table, **qualifiers) ⇒ Boolean
Check if table exists in remote db.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/dwh/adapters/trino.rb', line 81 def table?(table, **qualifiers) db_table = Table.new(table, **qualifiers) query = ['SHOW TABLES'] if db_table.catalog_or_schema? query << 'FROM' query << db_table.fully_qualified_schema_name end query << "LIKE '#{db_table.physical_name}'" rows = execute(query.compact.join(' '), retries: 1) !rows.empty? end |
#tables(**qualifiers) ⇒ Array<String>
Get all tables available in the target db. It will use the default catalog and schema config only specified here.
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/dwh/adapters/trino.rb', line 64 def tables(**qualifiers) catalog, schema = qualifiers.values_at(:catalog, :schema) query = ['SHOW TABLES'] query << 'FROM' if catalog || schema if catalog && schema query << "#{catalog}.#{schema}" else query << catalog query << schema end rows = execute(query.compact.join(' '), retries: 1) rows.flatten end |
#test_connection(raise_exception: false) ⇒ Object
54 55 56 57 58 59 60 61 |
# File 'lib/dwh/adapters/trino.rb', line 54 def test_connection(raise_exception: false) connection.run('select 1') true rescue ::Trino::Client::TrinoHttpError, Faraday::ConnectionFailed => e raise ConnectionError, e. if raise_exception false end |
#valid_config? ⇒ Boolean
193 194 195 196 197 198 |
# File 'lib/dwh/adapters/trino.rb', line 193 def valid_config? super require 'trino-client' rescue LoadError raise ConfigError, "Required 'trino-client' gem missing. Please add it to your Gemfile." end |