Class: DWH::Adapters::Postgres

Inherits:
Adapter
  • Object
show all
Defined in:
lib/dwh/adapters/postgres.rb

Overview

Postgres adapter. Please ensure the pg gem is available before using this adapter. Generally, adapters should be created using DWH.create. Where a configuration is passed in as options hash or argument list.

Examples:

Basic connection with required only options

DWH.create(:postgres, {host: 'localhost', database: 'postgres',
  username: 'postgres'})

Connection with cert based SSL connection

DWH.create(:postgres, {host: 'localhost', database: 'postgres',
  username: 'postgres', ssl: true,
  extra_connection_params: { sslmode: 'require' })

valid sslmodes: disable, prefer, require, verify-ca, verify-full
For modes requiring Certs make sure you add the appropirate params
to extra_connection_params. (ie sslrootcert, sslcert etc.)

Connection sending custom application name

DWH.create(:postgres, {host: 'localhost', database: 'postgres',
  username: 'postgres', application_name: "Strata CLI" })

Direct Known Subclasses

Redshift

Constant Summary

Constants included from Settings

Settings::BASE_SETTINGS_FILE

Constants included from Functions::Dates

Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS

Instance Attribute Summary

Attributes inherited from Adapter

#config, #settings

Attributes included from Settings

#adapter_settings

Instance Method Summary collapse

Methods inherited from Adapter

#adapter_name, #alter_settings, #close, config, configuration, #connect!, #connect?, #extra_connection_params, #extra_query_params, #initialize, #reset_settings, #token_expired?, #with_debug, #with_retry

Methods included from Settings

#adapter_name, #load_settings, #settings_file, #settings_file_path, #using_base_settings?

Methods included from Logger

#logger, logger

Methods included from Behaviors

#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type

Methods included from Functions

#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case

Methods included from Functions::Arrays

#array_exclude_list, #array_in_list, #array_unnest_join

Methods included from Functions::Nulls

#if_null, #null_if, #null_if_zero

Methods included from Functions::ExtractDatePart

#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month

Methods included from Functions::Dates

#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_lit, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #timestamp_lit, #timestamp_literal, #truncate_date, #week_start_day, #week_starts_on_sunday?

Methods included from Capabilities

#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?

Constructor Details

This class inherits a constructor from DWH::Adapters::Adapter

Instance Method Details

#connectionObject

Creates a connection to the target database and returns the connection object or self



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/dwh/adapters/postgres.rb', line 35

def connection
  return @connection if @connection

  set_default_ssl_mode_if_needed

  properties = {
    host: config[:host],
    port: config[:port],
    dbname: config[:database],
    user: config[:username],
    password: config[:password],
    application_name: config[:client_name]
  }.merge(extra_connection_params)
  properties[:options] = "#{properties[:options]} -c statement_timeout=#{config[:query_timeout] * 1000}"

  @connection = PG.connect(properties)

  # this could be comma separated list
  @connection.exec("SET search_path TO #{config[:schema]}") if schema?

  @connection
rescue StandardError => e
  raise ConfigError, e.message
end

#date_add(unit, val, exp) ⇒ Object

Need to override default add method since postgres doesn't support quarter as an interval.

Parameters:

  • unit (String)

    Should be one of day, month, quarter etc

  • val (String, Integer)

    The number of days to add

  • exp (String)

    The sql expresssion to modify



209
210
211
212
213
214
215
216
217
218
# File 'lib/dwh/adapters/postgres.rb', line 209

def date_add(unit, val, exp)
  if unit.downcase.strip == 'quarter'
    unit = 'months'
    val = val.to_i * 3
  end
  gsk(:date_add)
    .gsub(/@unit/i, unit)
    .gsub(/@val/i, val.to_s)
    .gsub(/@exp/i, exp)
end

#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...

Execute sql on the target database.

Parameters:

  • sql (String)

    actual sql

  • format (Symbol, String) (defaults to: :array)

    return format type

    • array returns array of array
    • object returns array of Hashes
    • csv returns as csv
    • native returns the native result from any clients used
      • For example: Postgres using pg client will return PG::Result
      • Http clients will returns the HTTP response object
  • retries (Integer) (defaults to: 0)

    number of retries in case of failure. Default is 0

Returns:

  • (Array<Array>, Hash, CSV, Native)

Raises:



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/dwh/adapters/postgres.rb', line 150

def execute(sql, format: :array, retries: 0)
  begin
    result = with_debug(sql) { with_retry(retries) { connection.exec(sql) } }
  rescue StandardError => e
    raise ExecutionError, e.message
  end

  format = format.downcase if format.is_a?(String)
  case format.to_sym
  when :array
    result.values
  when :object
    result.to_a
  when :csv
    result_to_csv(result)
  when :native
    result
  else
    raise UnsupportedCapability, "Unsupported format: #{format} for this #{name}"
  end
end

#execute_stream(sql, io, stats: nil, retries: 0) ⇒ IO

Execute sql and stream responses back. Data is writtent out in CSV format to the provided IO object.

Parameters:

  • sql (String)

    actual sql

  • io (IO)

    IO object to write records to

  • stats (StreamingStats) (defaults to: nil)

    collect stats and preview data this is optional

  • retries (Integer) (defaults to: 0)

    number of retries in case of failure

Returns:

  • (IO)

Raises:



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/dwh/adapters/postgres.rb', line 173

def execute_stream(sql, io, stats: nil, retries: 0)
  with_debug(sql) do
    with_retry(retries) do
      connection.exec(sql) do |result|
        io.write(CSV.generate_line(result.fields))
        result.each_row do |row|
          stats << row unless stats.nil?
          io.write(CSV.generate_line(row))
        end
      end
    end
  end

  io.rewind
  io
rescue StandardError => e
  raise ExecutionError, e.message
end

#metadata(table, **qualifiers) ⇒ DWH::Table

Get the schema structure of a given a given table_name. Pass in optional catalog and schema info.

Example: metadata("public.big_table") metadata("big_table") metadata("big_table",schema: "public")

Parameters:

  • table (String)
    • table name
  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn't support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn't support

Returns:



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/dwh/adapters/postgres.rb', line 113

def (table, **qualifiers)
  db_table = Table.new table, schema: qualifiers[:schema]

  schema_where = ''
  if db_table.schema?
    schema_where = "AND table_schema = '#{db_table.schema}'"
  elsif schema?
    schema_where = "AND table_schema in (#{qualified_schema_name})"
  end

  sql = <<-SQL
              SELECT column_name, data_type, character_maximum_length, numeric_precision,numeric_scale
              FROM information_schema.columns
              WHERE table_name = '#{db_table.physical_name}'
              #{schema_where}
  SQL

  cols = execute(sql, format: 'object')
  cols.each do |col|
    db_table << Column.new(
      name: col['column_name'],
      data_type: col['data_type'],
      precision: col['numeric_precision'],
      scale: col['numeric_scale'],
      max_char_length: col['character_maximum_length']
    )
  end

  db_table
end

#schema?Boolean

True if the configuration was setup with a schema.

Returns:



145
146
147
# File 'lib/dwh/adapters/postgres.rb', line 145

def schema?
  !config[:schema].nil? && !config[:schema]&.strip&.empty?
end

#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table

Returns basic stats of a given table. Will typically include row_count, date_start, and date_end.

Examples:

stats("public.big_table", date_column: "fact_date")
stats("big_table")
stats("big_table",schema: "public")

Parameters:

  • table (String)

    table name

  • date_column (String) (defaults to: nil)

    optional date column to use to find range

  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn't support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn't support

Returns:

Raises:



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/dwh/adapters/postgres.rb', line 95

def stats(table, date_column: nil, **qualifiers)
  table_name = qualifiers[:schema] ? "#{qualifiers[:schema]}.#{table}" : table
  sql = <<-SQL
              SELECT count(*) ROW_COUNT
                  #{date_column.nil? ? nil : ", min(#{date_column}) DATE_START"}
                  #{date_column.nil? ? nil : ", max(#{date_column}) DATE_END"}
              FROM "#{table_name}"
  SQL

  result = connection.exec(sql)
  TableStats.new(
    row_count: result.first['row_count'],
    date_start: result.first['date_start'],
    date_end: result.first['date_end']
  )
end

#stream(sql) {|chunk| ... } ⇒ Object

Executes the given sql and yields the streamed results to the given block.

Parameters:

  • sql (String)

    actual sql

Yields:

  • (chunk)

    Yields a streamed chunk as it streams in. The chunk type might vary depending on the target db and settings

Raises:



193
194
195
196
197
198
199
200
201
# File 'lib/dwh/adapters/postgres.rb', line 193

def stream(sql, &block)
  with_debug(sql) do
    connection.exec(sql) do |result|
      result.each_row do |row|
        block.call(row)
      end
    end
  end
end

#table?(table_name) ⇒ Boolean

Check if table exists in remote db.

Parameters:

  • qualifiers (Hash)

    a customizable set of options

Returns:



90
91
92
# File 'lib/dwh/adapters/postgres.rb', line 90

def table?(table_name)
  tables.include?(table_name)
end

#tables(**qualifiers) ⇒ Array<String>

Get all tables available in the target db. It will use the default catalog and schema config only specified here.

Parameters:

  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn't support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn't support

Returns:

  • (Array<String>)


71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/dwh/adapters/postgres.rb', line 71

def tables(**qualifiers)
  sql = if schema? || qualifiers[:schema]
          <<-SQL
                  SELECT table_name#{' '}
                  FROM information_schema.tables
                  WHERE table_schema in (#{qualified_schema_name(qualifiers)})
          SQL
        else
          <<-SQL
                  SELECT table_name
                  FROM information_schema.tables
          SQL
        end

  result = connection.exec(sql)
  result.values.flatten
end

#test_connection(raise_exception: false) ⇒ Boolean

Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self

Returns:

Raises:



61
62
63
64
65
66
67
68
# File 'lib/dwh/adapters/postgres.rb', line 61

def test_connection(raise_exception: false)
  connection
  true
rescue StandardError => e
  raise ConnectionError, e.message if raise_exception

  false
end

#valid_config?Boolean

Returns:



220
221
222
223
224
225
# File 'lib/dwh/adapters/postgres.rb', line 220

def valid_config?
  super
  require 'pg'
rescue LoadError
  raise ConfigError, "Required 'pg' gem missing. Please add it to your Gemfile."
end