Class: DWH::Adapters::Trino

Inherits:
Adapter
  • Object
show all
Defined in:
lib/dwh/adapters/trino.rb

Overview

Trino adapter. This should work for Presto as well. This adapter requires the trino-client-ruby gem.

Create adatper instances using DWH.create.

Examples:

Basic connection with required only options

DWH.create(:trino, {host: 'localhost', catalog: 'native', username: 'Ajo'})

Connect with extra http headers

DWH.create(:trino, {host: 'localhost', port: 8080,
  catalog: 'native', username: 'Ajo',
  extra_connection_params: {
    http_headers: {
      'X-Trino-User' => 'True User Name',
      'X-Forwarded-Request' => '<request passed down from client'
    }
  }
  })

Constant Summary

Constants included from Settings

Settings::BASE_SETTINGS_FILE

Constants included from Functions::Dates

Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS

Instance Attribute Summary

Attributes inherited from Adapter

#config, #settings

Attributes included from Settings

#adapter_settings

Instance Method Summary collapse

Methods inherited from Adapter

#adapter_name, #alter_settings, #close, config, configuration, #connect!, #connect?, #extra_connection_params, #extra_query_params, #initialize, #reset_settings, #token_expired?, #with_debug, #with_retry

Methods included from Settings

#adapter_name, #load_settings, #settings_file, #settings_file_path, #using_base_settings?

Methods included from Logger

#logger, logger

Methods included from Behaviors

#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type

Methods included from Functions

#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case

Methods included from Functions::Arrays

#array_exclude_list, #array_in_list, #array_unnest_join

Methods included from Functions::Nulls

#if_null, #null_if, #null_if_zero

Methods included from Functions::ExtractDatePart

#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month

Methods included from Functions::Dates

#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_add, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_lit, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #timestamp_lit, #timestamp_literal, #truncate_date, #week_start_day, #week_starts_on_sunday?

Methods included from Capabilities

#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?

Constructor Details

This class inherits a constructor from DWH::Adapters::Adapter

Instance Method Details

#connectionObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/dwh/adapters/trino.rb', line 32

def connection
  return @connection if @connection

  ssl_setup = config[:ssl] ? { verify: false } : config[:ssl]

  properties = {
    server: "#{config[:host]}:#{config[:port]}",
    ssl: ssl_setup,
    schema: config[:schema],
    catalog: config[:catalog],
    user: config[:username],
    password: config[:password],
    query_timeout: config[:query_timeout],
    source: config[:client_name]
  }.merge(extra_connection_params)

  @connection = ::Trino::Client.new(properties)
rescue StandardError => e
  raise ConfigError, e.message
end

#execute(sql, format: :array, retries: 2) ⇒ Array<Array>, ...

Execute sql on the target database.

Parameters:

  • sql (String)

    actual sql

  • format (Symbol, String) (defaults to: :array)

    return format type

    • array returns array of array
    • object returns array of Hashes
    • csv returns as csv
    • native returns the native result from any clients used
      • For example: Postgres using pg client will return PG::Result
      • Http clients will returns the HTTP response object
  • retries (Integer) (defaults to: 2)

    number of retries in case of failure. Default is 0

Returns:

  • (Array<Array>, Hash, CSV, Native)

Raises:



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/dwh/adapters/trino.rb', line 139

def execute(sql, format: :array, retries: 2)
  result = with_debug(sql) do
    with_retry(retries) do
      if format == :object
        connection.run_with_names(sql)
      else
        connection.run(sql)
      end
    end
  end

  case format
  when :native
    result
  when :csv
    result_to_csv(result)
  when :array
    result[1]
  when :object
    result
  else
    raise UnsupportedCapability, "Unknown format type: #{format}. Should be :native, :array, :object, or :csv"
  end
rescue ::Trino::Client::TrinoQueryError => e
  raise ExecutionError, e.message
end

#execute_stream(sql, io, stats: nil, retries: 1) ⇒ IO

Execute sql and stream responses back. Data is writtent out in CSV format to the provided IO object.

Parameters:

  • sql (String)

    actual sql

  • io (IO)

    IO object to write records to

  • stats (StreamingStats) (defaults to: nil)

    collect stats and preview data this is optional

  • retries (Integer) (defaults to: 1)

    number of retries in case of failure

Returns:

  • (IO)

Raises:



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/dwh/adapters/trino.rb', line 167

def execute_stream(sql, io, stats: nil, retries: 1)
  with_debug(sql) do
    with_retry(retries) do
      connection.query(sql) do |result|
        io.write(CSV.generate_line(result.columns.map(&:name)))
        result.each_row do |row|
          stats << row if stats
          io << CSV.generate_line(row)
        end
      end
    end
  end

  io.rewind
  io
end

#metadata(table, **qualifiers) ⇒ DWH::Table

Get the schema structure of a given a given table_name. Pass in optional catalog and schema info.

Example: metadata("public.big_table") metadata("big_table") metadata("big_table",schema: "public")

Parameters:

  • table (String)
    • table name
  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn't support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn't support

Returns:



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/dwh/adapters/trino.rb', line 117

def (table, **qualifiers)
  db_table = Table.new table, **qualifiers
  sql = "SHOW COLUMNS FROM #{db_table.fully_qualified_table_name}"

  _, cols = execute(sql, format: :native, retries: 1)

  cols.each do |col|
    dt = col[1].start_with?('row(') ? 'struct' : col[1]
    db_table << Column.new(
      name: col[0],
      data_type: dt
    )
  end

  db_table
end

#schema?Boolean

Returns:



134
135
136
# File 'lib/dwh/adapters/trino.rb', line 134

def schema?
  config.key?(:schema)
end

#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table

Returns basic stats of a given table. Will typically include row_count, date_start, and date_end.

Examples:

stats("public.big_table", date_column: "fact_date")
stats("big_table")
stats("big_table",schema: "public")

Parameters:

  • table (String)

    table name

  • date_column (String) (defaults to: nil)

    optional date column to use to find range

  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn't support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn't support

Returns:

Raises:



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/dwh/adapters/trino.rb', line 97

def stats(table, date_column: nil, **qualifiers)
  db_table = Table.new(table, **qualifiers)
  sql = <<-SQL
              SELECT count(*) ROW_COUNT
                  #{date_column.nil? ? nil : ", min(#{date_column}) DATE_START"}
                  #{date_column.nil? ? nil : ", max(#{date_column}) DATE_END"}
              FROM #{db_table.fully_qualified_table_name}
  SQL

  rows = execute(sql, retries: 1)
  row = rows[0]

  TableStats.new(
    date_start: row[1],
    date_end: row[2],
    row_count: row[0]
  )
end

#stream(sql) {|chunk| ... } ⇒ Object

Executes the given sql and yields the streamed results to the given block.

Parameters:

  • sql (String)

    actual sql

Yields:

  • (chunk)

    Yields a streamed chunk as it streams in. The chunk type might vary depending on the target db and settings

Raises:



185
186
187
188
189
190
191
# File 'lib/dwh/adapters/trino.rb', line 185

def stream(sql, &block)
  with_debug(sql) do
    connection.query(sql) do |result|
      result.each_row(&block)
    end
  end
end

#table?(table, **qualifiers) ⇒ Boolean

Check if table exists in remote db.

Parameters:

  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn't support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn't support

Returns:



81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/dwh/adapters/trino.rb', line 81

def table?(table, **qualifiers)
  db_table = Table.new(table, **qualifiers)

  query = ['SHOW TABLES']

  if db_table.catalog_or_schema?
    query << 'FROM'
    query << db_table.fully_qualified_schema_name
  end
  query << "LIKE '#{db_table.physical_name}'"

  rows = execute(query.compact.join(' '), retries: 1)
  !rows.empty?
end

#tables(**qualifiers) ⇒ Array<String>

Get all tables available in the target db. It will use the default catalog and schema config only specified here.

Parameters:

  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn't support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn't support

Returns:

  • (Array<String>)


64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/dwh/adapters/trino.rb', line 64

def tables(**qualifiers)
  catalog, schema = qualifiers.values_at(:catalog, :schema)
  query = ['SHOW TABLES']
  query << 'FROM' if catalog || schema

  if catalog && schema
    query << "#{catalog}.#{schema}"
  else
    query << catalog
    query << schema
  end

  rows = execute(query.compact.join(' '), retries: 1)
  rows.flatten
end

#test_connection(raise_exception: false) ⇒ Object



54
55
56
57
58
59
60
61
# File 'lib/dwh/adapters/trino.rb', line 54

def test_connection(raise_exception: false)
  connection.run('select 1')
  true
rescue ::Trino::Client::TrinoHttpError, Faraday::ConnectionFailed => e
  raise ConnectionError, e.message if raise_exception

  false
end

#valid_config?Boolean

Returns:



193
194
195
196
197
198
# File 'lib/dwh/adapters/trino.rb', line 193

def valid_config?
  super
  require 'trino-client'
rescue LoadError
  raise ConfigError, "Required 'trino-client' gem missing. Please add it to your Gemfile."
end