Class: DWH::Adapters::Postgres
- Defined in:
- lib/dwh/adapters/postgres.rb
Overview
Postgres adapter. Please ensure the pg gem is available before using this adapter. Generally, adapters should be created using DWH.create. Where a configuration is passed in as options hash or argument list.
Direct Known Subclasses
Constant Summary
Constants included from Settings
Constants included from Functions::Dates
Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS
Instance Attribute Summary
Attributes inherited from Adapter
Attributes included from Settings
Instance Method Summary collapse
-
#connection ⇒ Object
Creates a connection to the target database and returns the connection object or self.
-
#date_add(unit, val, exp) ⇒ Object
Need to override default add method since postgres doesn't support quarter as an interval.
-
#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...
Execute sql on the target database.
-
#execute_stream(sql, io, stats: nil, retries: 0) ⇒ IO
Execute sql and stream responses back.
-
#metadata(table, **qualifiers) ⇒ DWH::Table
Get the schema structure of a given a given table_name.
-
#schema? ⇒ Boolean
True if the configuration was setup with a schema.
-
#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table
Returns basic stats of a given table.
-
#stream(sql) {|chunk| ... } ⇒ Object
Executes the given sql and yields the streamed results to the given block.
-
#table?(table_name) ⇒ Boolean
Check if table exists in remote db.
-
#tables(**qualifiers) ⇒ Array<String>
Get all tables available in the target db.
-
#test_connection(raise_exception: false) ⇒ Boolean
Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self.
- #valid_config? ⇒ Boolean
Methods inherited from Adapter
#adapter_name, #alter_settings, #close, config, configuration, #connect!, #connect?, #extra_connection_params, #extra_query_params, #initialize, #reset_settings, #token_expired?, #with_debug, #with_retry
Methods included from Settings
#adapter_name, #load_settings, #settings_file, #settings_file_path, #using_base_settings?
Methods included from Logger
Methods included from Behaviors
#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type
Methods included from Functions
#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case
Methods included from Functions::Arrays
#array_exclude_list, #array_in_list, #array_unnest_join
Methods included from Functions::Nulls
#if_null, #null_if, #null_if_zero
Methods included from Functions::ExtractDatePart
#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month
Methods included from Functions::Dates
#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_lit, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #timestamp_lit, #timestamp_literal, #truncate_date, #week_start_day, #week_starts_on_sunday?
Methods included from Capabilities
#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?
Constructor Details
This class inherits a constructor from DWH::Adapters::Adapter
Instance Method Details
#connection ⇒ Object
Creates a connection to the target database and returns the connection object or self
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/dwh/adapters/postgres.rb', line 35 def connection return @connection if @connection set_default_ssl_mode_if_needed properties = { host: config[:host], port: config[:port], dbname: config[:database], user: config[:username], password: config[:password], application_name: config[:client_name] }.merge(extra_connection_params) properties[:options] = "#{properties[:options]} -c statement_timeout=#{config[:query_timeout] * 1000}" @connection = PG.connect(properties) # this could be comma separated list @connection.exec("SET search_path TO #{config[:schema]}") if schema? @connection rescue StandardError => e raise ConfigError, e. end |
#date_add(unit, val, exp) ⇒ Object
Need to override default add method since postgres doesn't support quarter as an interval.
209 210 211 212 213 214 215 216 217 218 |
# File 'lib/dwh/adapters/postgres.rb', line 209 def date_add(unit, val, exp) if unit.downcase.strip == 'quarter' unit = 'months' val = val.to_i * 3 end gsk(:date_add) .gsub(/@unit/i, unit) .gsub(/@val/i, val.to_s) .gsub(/@exp/i, exp) end |
#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...
Execute sql on the target database.
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/dwh/adapters/postgres.rb', line 150 def execute(sql, format: :array, retries: 0) begin result = with_debug(sql) { with_retry(retries) { connection.exec(sql) } } rescue StandardError => e raise ExecutionError, e. end format = format.downcase if format.is_a?(String) case format.to_sym when :array result.values when :object result.to_a when :csv result_to_csv(result) when :native result else raise UnsupportedCapability, "Unsupported format: #{format} for this #{name}" end end |
#execute_stream(sql, io, stats: nil, retries: 0) ⇒ IO
Execute sql and stream responses back. Data is writtent out in CSV format to the provided IO object.
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/dwh/adapters/postgres.rb', line 173 def execute_stream(sql, io, stats: nil, retries: 0) with_debug(sql) do with_retry(retries) do connection.exec(sql) do |result| io.write(CSV.generate_line(result.fields)) result.each_row do |row| stats << row unless stats.nil? io.write(CSV.generate_line(row)) end end end end io.rewind io rescue StandardError => e raise ExecutionError, e. end |
#metadata(table, **qualifiers) ⇒ DWH::Table
Get the schema structure of a given a given table_name. Pass in optional catalog and schema info.
Example: metadata("public.big_table") metadata("big_table") metadata("big_table",schema: "public")
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/dwh/adapters/postgres.rb', line 113 def (table, **qualifiers) db_table = Table.new table, schema: qualifiers[:schema] schema_where = '' if db_table.schema? schema_where = "AND table_schema = '#{db_table.schema}'" elsif schema? schema_where = "AND table_schema in (#{qualified_schema_name})" end sql = <<-SQL SELECT column_name, data_type, character_maximum_length, numeric_precision,numeric_scale FROM information_schema.columns WHERE table_name = '#{db_table.physical_name}' #{schema_where} SQL cols = execute(sql, format: 'object') cols.each do |col| db_table << Column.new( name: col['column_name'], data_type: col['data_type'], precision: col['numeric_precision'], scale: col['numeric_scale'], max_char_length: col['character_maximum_length'] ) end db_table end |
#schema? ⇒ Boolean
True if the configuration was setup with a schema.
145 146 147 |
# File 'lib/dwh/adapters/postgres.rb', line 145 def schema? !config[:schema].nil? && !config[:schema]&.strip&.empty? end |
#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table
Returns basic stats of a given table. Will typically include row_count, date_start, and date_end.
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/dwh/adapters/postgres.rb', line 95 def stats(table, date_column: nil, **qualifiers) table_name = qualifiers[:schema] ? "#{qualifiers[:schema]}.#{table}" : table sql = <<-SQL SELECT count(*) ROW_COUNT #{date_column.nil? ? nil : ", min(#{date_column}) DATE_START"} #{date_column.nil? ? nil : ", max(#{date_column}) DATE_END"} FROM "#{table_name}" SQL result = connection.exec(sql) TableStats.new( row_count: result.first['row_count'], date_start: result.first['date_start'], date_end: result.first['date_end'] ) end |
#stream(sql) {|chunk| ... } ⇒ Object
Executes the given sql and yields the streamed results to the given block.
193 194 195 196 197 198 199 200 201 |
# File 'lib/dwh/adapters/postgres.rb', line 193 def stream(sql, &block) with_debug(sql) do connection.exec(sql) do |result| result.each_row do |row| block.call(row) end end end end |
#table?(table_name) ⇒ Boolean
Check if table exists in remote db.
90 91 92 |
# File 'lib/dwh/adapters/postgres.rb', line 90 def table?(table_name) tables.include?(table_name) end |
#tables(**qualifiers) ⇒ Array<String>
Get all tables available in the target db. It will use the default catalog and schema config only specified here.
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/dwh/adapters/postgres.rb', line 71 def tables(**qualifiers) sql = if schema? || qualifiers[:schema] <<-SQL SELECT table_name#{' '} FROM information_schema.tables WHERE table_schema in (#{qualified_schema_name(qualifiers)}) SQL else <<-SQL SELECT table_name FROM information_schema.tables SQL end result = connection.exec(sql) result.values.flatten end |
#test_connection(raise_exception: false) ⇒ Boolean
Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self
61 62 63 64 65 66 67 68 |
# File 'lib/dwh/adapters/postgres.rb', line 61 def test_connection(raise_exception: false) connection true rescue StandardError => e raise ConnectionError, e. if raise_exception false end |
#valid_config? ⇒ Boolean
220 221 222 223 224 225 |
# File 'lib/dwh/adapters/postgres.rb', line 220 def valid_config? super require 'pg' rescue LoadError raise ConfigError, "Required 'pg' gem missing. Please add it to your Gemfile." end |