Class: DWH::StreamingStats

Inherits:
Object
  • Object
show all
Defined in:
lib/dwh/streaming_stats.rb

Overview

Basic streaming stats collector. This is used when runing a query via execute_streaming method. As the data comes in it will write to the stats object. This can be read in another thread to update a UI.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(limit = 20_000) ⇒ StreamingStats

Returns a new instance of StreamingStats.



13
14
15
16
17
18
# File 'lib/dwh/streaming_stats.rb', line 13

def initialize(limit = 20_000)
  @status = :init
  @in_memory_limit = limit
  @mutex = Mutex.new
  reset
end

Instance Attribute Details

#in_memory_limitObject (readonly)

Most cases the streaming data will be streamed to a tempfile rather than memory. In those cases, we will keep a limited amount of data in memory. This is for previewing or other quick intropsections. The default limit is 20,000 rows.



11
12
13
# File 'lib/dwh/streaming_stats.rb', line 11

def in_memory_limit
  @in_memory_limit
end

Instance Method Details

#<<(row) ⇒ Object

Add a single row to the in memory dataset. Will automatically stop once the limit is reached.

Parameters:

  • row (Array)

Raises:

  • (ArgumentError)


33
34
35
36
37
38
39
40
41
# File 'lib/dwh/streaming_stats.rb', line 33

def <<(row)
  raise ArgumentError, 'Row must be an array' unless row.is_a?(Array)

  @mutex.synchronize do
    @data << row unless @data.size >= @in_memory_limit
    @total_rows += 1
    @max_row_size = [@max_row_size, row.to_s.bytesize].max
  end
end

#add_row(row) ⇒ Object



43
44
45
# File 'lib/dwh/streaming_stats.rb', line 43

def add_row(row)
  self << row
end

#dataObject

Returns the streamed result set thus far upto the specified limit or default 20,000 rows.



49
50
51
# File 'lib/dwh/streaming_stats.rb', line 49

def data
  @mutex.synchronize { @data }
end

#limit_reached?Boolean

Whether the in memory row limit was reached

Returns:

  • (Boolean)


65
66
67
# File 'lib/dwh/streaming_stats.rb', line 65

def limit_reached?
  data.size >= in_memory_limit
end

#max_row_sizeObject

Largest row in bytesize. Can estimate eventual file size.



60
61
62
# File 'lib/dwh/streaming_stats.rb', line 60

def max_row_size
  @mutex.synchronize { @max_row_size }
end

#resetObject

Resets the data, total rows etc back to 0



21
22
23
24
25
26
27
# File 'lib/dwh/streaming_stats.rb', line 21

def reset
  @mutex.synchronize do
    @total_rows = 0
    @max_row_size = 0
    @data = []
  end
end

#total_rowsObject

The total rows streamed. This is everything written to the IO object including whats in memory.



55
56
57
# File 'lib/dwh/streaming_stats.rb', line 55

def total_rows
  @mutex.synchronize { @total_rows }
end