Advanced Usage Guide

This guide covers advanced features and usage patterns for DWH, including function translation, streaming, connection pooling, and performance optimization.

SQL Function Translation

DWH provides automatic translation of common SQL functions to database-specific syntax. This allows you to write database-agnostic code while leveraging native optimizations.

Date Functions

Date Truncation

# Truncate dates to different periods
adapter.truncate_date('week', 'created_at')
# PostgreSQL: DATE_TRUNC('week', created_at)
# SQL Server: DATETRUNC(week, created_at)
# MySQL: DATE_FORMAT(created_at, '%Y-%m-%d') - complex logic

adapter.truncate_date('month', 'order_date')
adapter.truncate_date('year', 'signup_date')

Date Literals

# Create database-specific date literals
adapter.date_literal('2025-01-01')
# PostgreSQL: '2025-01-01'::DATE
# SQL Server: '2025-01-01'
# MySQL: '2025-01-01'

adapter.date_time_literal('2025-01-01 10:30:00')
# PostgreSQL: '2025-01-01 10:30:00'::TIMESTAMP
# SQL Server: '2025-01-01 10:30:00'

Date Arithmetic

# Add/subtract time periods
adapter.date_add('created_at', 30, 'day')
# PostgreSQL: (created_at + '30 day'::interval)
# SQL Server: DATEADD(day, 30, created_at)
# MySQL: TIMESTAMPADD(day, 30, created_at)

adapter.date_diff('end_date', 'start_date', 'day')
# Calculate difference between dates in specified units

Date Extraction

# Extract date parts
adapter.extract_year('created_at')
# PostgreSQL: extract(year from created_at)
# SQL Server: DATEPART(year, created_at)

adapter.extract_month('created_at')
adapter.extract_day_of_week('created_at')
adapter.extract_quarter('created_at')

String Functions

# String manipulation
adapter.trim('column_name')        # Remove whitespace
adapter.upper_case('column_name')  # Convert to uppercase
adapter.lower_case('column_name')  # Convert to lowercase

# Quoting and literals
adapter.quote('column_name')       # Database-specific column quoting
adapter.string_literal('value')    # Database-specific string literals

Null Handling

# Null value handling
adapter.if_null('column1', "'default'")
# PostgreSQL: COALESCE(column1, 'default')
# SQL Server: ISNULL(column1, 'default')

adapter.null_if('column1', "'empty'")
# Returns NULL if column1 equals 'empty'

adapter.null_if_zero('numeric_column')
# Returns NULL if numeric_column equals 0

Array Functions

Available for databases that support array operations (PostgreSQL, Druid):

# Check if array contains any values from a list
adapter.array_in_list('tags', "'tech', 'science'")
# PostgreSQL: tags && ARRAY['tech', 'science']
# Druid: MV_OVERLAP(tags, ARRAY['tech', 'science'])

# Check if array excludes all values from a list
adapter.array_exclude_list('categories', "'spam', 'test'")

# Unnest/explode array for joins
adapter.array_unnest_join('tags', 'tag_alias')
# PostgreSQL: CROSS JOIN UNNEST(tags) AS tag_alias
# Druid: CROSS JOIN UNNEST(MV_TO_ARRAY(tags)) tag_alias

Type Casting

# Database-specific type casting
adapter.cast('column_name', 'INTEGER')
# PostgreSQL: column_name::INTEGER
# SQL Server: CAST(column_name AS INTEGER)
# MySQL: CAST(column_name AS SIGNED)

Streaming and Large Result Sets

Basic Streaming

# Stream results directly to a file
File.open('large_export.csv', 'w') do |file|
  adapter.execute_stream("SELECT * FROM large_table", file)
end

# Stream with custom processing
adapter.stream("SELECT * FROM large_table") do |chunk|
  # Process each chunk as it arrives
  process_data_chunk(chunk)
end

Streaming with Statistics

# Create streaming stats collector
stats = DWH::StreamingStats.new(10_000)  # Keep 10k rows in memory for preview

# Stream with stats tracking
File.open('export.csv', 'w') do |file|
  exec_thread = adapter.execute_stream("SELECT * FROM large_table", file, stats: stats)

  # Monitor progress in another thread
  Thread.new do
    loop do
      puts "Processed: #{stats.total_rows} rows"
      puts "Preview size: #{stats.data.size} rows"
      puts "Max row size: #{stats.max_row_size} bytes"
      sleep(5)
      break unless exec_thread.alive? 
    end
  end
end

# Access collected statistics
puts "Final count: #{stats.total_rows}"
puts "Sample data: #{stats.data.first(5)}"

Memory Management

# Configure streaming stats memory usage
stats = DWH::StreamingStats.new(50_000)  # Keep more data for larger previews

# Reset stats for reuse
stats.reset

# Manual memory management
stats.add_row(['col1', 'col2', 'col3'])
current_data = stats.data  # Thread-safe access

Connection Pooling

Creating Connection Pools

# Create a named connection pool
pool = DWH.pool('analytics_pool', :postgres, {
  host: 'localhost',
  database: 'analytics',
  username: 'analyst',
  password: 'password'
}, size: 10, timeout: 5)

# Multiple pools for different databases
etl_pool = DWH.pool('etl_pool', :postgres, etl_config, size: 5)
reporting_pool = DWH.pool('reporting_pool', :mysql, reporting_config, size: 15)

Using Connection Pools

# Basic pool usage
pool.with do |connection|
  results = connection.execute("SELECT COUNT(*) FROM users")
   = connection.('orders')
end

# Nested pool operations
pool.with do |conn1|
  users = conn1.execute("SELECT id FROM users LIMIT 100")

  pool.with do |conn2|  # Gets different connection from pool
    orders = conn2.execute("SELECT * FROM orders WHERE user_id IN (?)", 
                          users.map(&:first))
  end
end

Pool Management

# Check pool status
puts "Pool size: #{pool.size}"
puts "Available connections: #{pool.available}"
puts "Active connections: #{pool.in_use}"

# Graceful shutdown
DWH.shutdown('analytics_pool')

# Shutdown all pools
DWH.shutdown_all

Database Capabilities Detection

Checking Capabilities

# Check what features are supported
if adapter.supports_window_functions?
  query = "SELECT name, ROW_NUMBER() OVER (ORDER BY created_at) FROM users"
  results = adapter.execute(query)
end

Available Capability Checks

  • supports_table_join? - Basic JOIN support
  • supports_full_join? - FULL OUTER JOIN support
  • supports_cross_join? - CROSS JOIN support
  • supports_sub_queries? - Subquery support
  • supports_common_table_expressions? - CTE support
  • supports_temp_tables? - Temporary table support
  • supports_window_functions? - Window function support
  • supports_array_functions? - Array operation support

Performance Optimization

Query Timeouts

# Set query timeouts per adapter
postgres = DWH.create(:postgres, {
  host: 'localhost',
  database: 'mydb',
  username: 'user',
  query_timeout: 1800  # 30 minutes
})

# For long-running analytical queries
druid = DWH.create(:druid, {
  host: 'localhost',
  port: 8080,
  protocol: 'http',
  query_timeout: 3600  # 1 hour
})

Result Format Optimization

# Choose appropriate result format for your use case
arrays = adapter.execute(sql, format: :array)    # Fastest, least memory
objects = adapter.execute(sql, format: :object)  # Hash access, more memory
csv = adapter.execute(sql, format: :csv)         # String format
native = adapter.execute(sql, format: :native)   # Database's native format

Streaming for Large Results

# Use streaming for large result sets
def export_large_table(adapter, table_name, output_file)
  query = "SELECT * FROM #{table_name}"

  File.open(output_file, 'w') do |file|
    adapter.execute_stream(query, file)
  end
end

# Chunk processing for memory efficiency
def process_large_dataset(adapter, query)
  adapter.stream(query) do |chunk|
    # Process each chunk immediately
    # Avoids loading entire result set into memory
    chunk.each { |row| process_row(row) }
  end
end

SQLite Performance Tuning

SQLite adapter comes with optimized defaults for analytical workloads, but can be further tuned:

# High-performance SQLite configuration for analytics
sqlite = DWH.create(:sqlite, {
  file: '/path/to/large_analytics.db',
  enable_wal: true,           # WAL mode for concurrent reads (default: true)
  timeout: 30000,             # 30 second busy timeout for heavy writes
  pragmas: {
    cache_size: -512000,      # 512MB cache for large datasets
    page_size: 8192,          # Larger pages for sequential scans
    mmap_size: 1073741824,    # 1GB memory-mapped I/O
    temp_store: 'MEMORY',     # Keep temp tables in RAM
    synchronous: 'NORMAL',    # Balance safety/speed (safe with WAL)
    journal_size_limit: 67108864  # 64MB journal limit
  }
})

# Read-only analytics queries with maximum performance
readonly_analytics = DWH.create(:sqlite, {
  file: '/path/to/data.db',
  readonly: true,             # Read-only for maximum concurrency
  pragmas: {
    cache_size: -256000,      # 256MB cache
    mmap_size: 2147483648,    # 2GB memory mapping for large files
    temp_store: 'MEMORY'      # Fast temp operations
  }
})

Error Handling and Debugging

Comprehensive Error Handling

begin
  results = adapter.execute("SELECT * FROM table")
rescue DWH::ExecutionError => e
  # Query execution failed
  puts "Query failed: #{e.message}"
  puts "SQL: #{e.sql}" if e.respond_to?(:sql)
rescue DWH::ConnectionError => e
  # Connection issues
  puts "Connection failed: #{e.message}"
  # Implement retry logic
rescue DWH::ConfigError => e
  # Configuration problems
  puts "Configuration error: #{e.message}"
rescue DWH::UnsupportedCapability => e
  # Attempted unsupported operation
  puts "Feature not supported: #{e.message}"
end

Custom Settings and Overrides

Runtime Settings Modification

# Create adapter with custom settings
adapter = DWH.create(:postgres, {
  host: 'localhost',
  database: 'mydb',
  username: 'user',
  settings: {
    quote: '`@exp`',  # Use backticks instead of double quotes
    supports_window_functions: false,  # Force disable window functions
    temp_table_type: 'cte'  # Prefer CTEs over subqueries
  }
})

# Modify settings at runtime
adapter.alter_settings({
  supports_full_join: false,  # Disable FULL JOINs
  final_pass_measure_join_type: 'inner'  # Use INNER JOINs
})

# Reset to original settings
adapter.reset_settings

Custom Function Mappings

# Override specific function translations
adapter = DWH.create(:postgres, {
  host: 'localhost',
  database: 'mydb',
  username: 'user',
  settings: {
    truncate_date: "DATE_TRUNC('@unit', @exp)",  # Custom date truncation
    cast: "@exp::@type",  # PostgreSQL-style casting
    null_if: "CASE WHEN @exp = @target THEN NULL ELSE @exp END"  # Custom NULLIF
  }
})