Class: DWH::Adapters::MySql
- Defined in:
- lib/dwh/adapters/my_sql.rb
Overview
MySql Adapter. To use this adapter make sure you have the MySql2 Gem installed. You can also pass additional connection properties via Adapter#extra_connection_params config property.
MySql concept of database maps to schema in this adapter. This is only important for the metadata methods where you want to pull up tables from a different database (aka schema).
Constant Summary
Constants included from Settings
Constants included from Functions::Dates
Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS
Instance Attribute Summary
Attributes inherited from Adapter
Attributes included from Settings
Instance Method Summary collapse
-
#connection ⇒ Object
Creates a connection to the target database and returns the connection object or self.
-
#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...
Execute sql on the target database.
-
#execute_stream(sql, io, stats: nil, retries: 0) ⇒ IO
Execute sql and stream responses back.
-
#metadata(table, **qualifiers) ⇒ DWH::Table
Get the schema structure of a given a given table_name.
-
#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table
Returns basic stats of a given table.
-
#stream(sql) {|chunk| ... } ⇒ Object
Executes the given sql and yields the streamed results to the given block.
-
#tables(**qualifiers) ⇒ Array<String>
Get all tables available in the target db.
-
#test_connection(raise_exception: false) ⇒ Boolean
Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self.
-
#truncate_date(unit, exp) ⇒ Object
Custom date truncation implementation.
Methods inherited from Adapter
#adapter_name, #alter_settings, #close, config, configuration, #connect!, #connect?, #extra_connection_params, #extra_query_params, #initialize, #reset_settings, #table?, #token_expired?, #with_debug, #with_retry
Methods included from Settings
#adapter_name, #load_settings, #settings_file, #settings_file_path, #using_base_settings?
Methods included from Logger
Methods included from Behaviors
#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type
Methods included from Functions
#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case
Methods included from Functions::Arrays
#array_exclude_list, #array_in_list, #array_unnest_join
Methods included from Functions::Nulls
#if_null, #null_if, #null_if_zero
Methods included from Functions::ExtractDatePart
#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month
Methods included from Functions::Dates
#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_add, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_lit, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #timestamp_lit, #timestamp_literal, #week_start_day, #week_starts_on_sunday?
Methods included from Capabilities
#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?
Constructor Details
This class inherits a constructor from DWH::Adapters::Adapter
Instance Method Details
#connection ⇒ Object
Creates a connection to the target database and returns the connection object or self
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/dwh/adapters/my_sql.rb', line 37 def connection return @connection if @connection set_default_ssl_mode_if_needed properties = { # Connection Settings host: config[:host], username: config[:username], password: config[:password], port: 3306, database: config[:database], # Timeout Settings connect_timeout: 10, read_timeout: config[:query_timeout], connect_attrs: { program: config[:client_name] } }.merge(extra_connection_params) @connection = Mysql2::Client.new(properties) rescue StandardError => e raise ConfigError, e. end |
#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...
Execute sql on the target database.
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/dwh/adapters/my_sql.rb', line 134 def execute(sql, format: :array, retries: 0) begin as_param = %i[array object].include?(format) ? format : :array result = with_debug(sql) { with_retry(retries) { connection.query(sql, as: as_param) } } rescue StandardError => e raise ExecutionError, e. end format = format.downcase if format.is_a?(String) case format.to_sym when :array, :object result.to_a when :csv result_to_csv(result) when :native result else raise UnsupportedCapability, "Unsupported format: #{format} for this #{name}" end end |
#execute_stream(sql, io, stats: nil, retries: 0) ⇒ IO
Execute sql and stream responses back. Data is writtent out in CSV format to the provided IO object.
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/dwh/adapters/my_sql.rb', line 156 def execute_stream(sql, io, stats: nil, retries: 0) with_debug(sql) do with_retry(retries) do result = connection.query(sql, stream: true, as: :array, cache_rows: false) io.write(CSV.generate_line(result.fields)) result.each do |row| io.write(CSV.generate_line(row)) stats << row if stats end end end io.rewind io rescue StandardError => e raise ExecutionError, e. end |
#metadata(table, **qualifiers) ⇒ DWH::Table
Get the schema structure of a given a given table_name. Pass in optional catalog and schema info.
Example: metadata("public.big_table") metadata("big_table") metadata("big_table",schema: "public")
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/dwh/adapters/my_sql.rb', line 108 def (table, **qualifiers) db_table = Table.new table, schema: qualifiers[:schema] schema_where = db_table.schema ? " AND table_schema = '#{db_table.schema}'" : '' sql = <<-SQL SELECT column_name, data_type, character_maximum_length, numeric_precision,numeric_scale FROM information_schema.columns WHERE lower(table_name) = lower('#{db_table.physical_name}') #{schema_where} SQL cols = execute(sql, format: :object) cols.each do |col| db_table << Column.new( name: col['COLUMN_NAME'], data_type: col['DATA_TYPE'], precision: col['NUMERIC_PRECISION'], scale: col['NUMERIC_SCALE'], max_char_length: col['CHARACTER_MAXIMUM_LENGTH'] ) end db_table end |
#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table
Returns basic stats of a given table. Will typically include row_count, date_start, and date_end.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/dwh/adapters/my_sql.rb', line 89 def stats(table, date_column: nil, **qualifiers) table = "#{qualifiers[:schema]}.#{table}" if qualifiers[:schema] sql = <<-SQL SELECT count(*) row_count #{date_column.nil? ? nil : ", min(#{date_column}) date_start"} #{date_column.nil? ? nil : ", max(#{date_column}) date_end"} FROM #{table} SQL result = connection.query(sql) TableStats.new( row_count: result.first['row_count'], date_start: result.first['date_start'], date_end: result.first['date_end'] ) end |
#stream(sql) {|chunk| ... } ⇒ Object
Executes the given sql and yields the streamed results to the given block.
175 176 177 178 179 180 181 182 |
# File 'lib/dwh/adapters/my_sql.rb', line 175 def stream(sql, &block) with_debug(sql) do result = connection.query(sql, as: :array, cache_rows: false) result.each do |row| block.call(row) end end end |
#tables(**qualifiers) ⇒ Array<String>
Get all tables available in the target db. It will use the default catalog and schema config only specified here.
74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/dwh/adapters/my_sql.rb', line 74 def tables(**qualifiers) schema = qualifiers[:schema] || config[:database] query = " SELECT t.table_name FROM information_schema.tables t WHERE t.table_schema = '#{schema}' ORDER BY t.table_name " res = connection.query(query, as: :array) res.to_a.flatten end |
#test_connection(raise_exception: false) ⇒ Boolean
Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self
64 65 66 67 68 69 70 71 |
# File 'lib/dwh/adapters/my_sql.rb', line 64 def test_connection(raise_exception: false) connection true rescue StandardError => e raise ConnectionError, e. if raise_exception false end |
#truncate_date(unit, exp) ⇒ Object
Custom date truncation implementation. MySql doesn't offer a native function. We basially have to format it and convert back to date object.
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/dwh/adapters/my_sql.rb', line 188 def truncate_date(unit, exp) unit = unit.strip.downcase case unit when 'year' "DATE(DATE_FORMAT(#{exp}, '%Y-01-01'))" when 'quarter' "DATE(DATE_ADD(DATE_FORMAT(#{exp}, '%Y-01-01'), INTERVAL (QUARTER(#{exp}) - 1) * 3 MONTH))" when 'month' "DATE(DATE_FORMAT(#{exp}, '%Y-%m-01'))" when 'week' gsk("#{settings[:week_start_day].downcase}_week_start_day") .gsub(/@exp/i, exp) when 'day', 'date' "DATE(#{exp})" when 'hour' "TIMESTAMP(DATE_FORMAT(#{exp}, '%Y-%m-%d %H:00:00'))" else raise UnsupportedCapability, "Currently not supporting truncation at #{unit} level" end end |