Class: ROM::EventStore::Dataset

Inherits:
Object
  • Object
show all
Defined in:
lib/rom/event_store/dataset.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(category, connection, options = {}) ⇒ Dataset

Returns a new instance of Dataset.



6
7
8
9
10
# File 'lib/rom/event_store/dataset.rb', line 6

def initialize(category, connection, options = {})
  @category = category
  @connection = connection
  @options = options
end

Instance Attribute Details

#categoryObject (readonly)

Returns the value of attribute category.



4
5
6
# File 'lib/rom/event_store/dataset.rb', line 4

def category
  @category
end

Instance Method Details

#append(events) ⇒ Object



33
34
35
36
# File 'lib/rom/event_store/dataset.rb', line 33

def append(events)
  @connection.append(stream, events).sync
  events
end

#eachObject



44
45
46
# File 'lib/rom/event_store/dataset.rb', line 44

def each
  with_events { |event| yield(event) }
end

#eventsObject



29
30
31
# File 'lib/rom/event_store/dataset.rb', line 29

def events
  @connection.read(stream, @options).sync
end

#from(id) ⇒ Object



16
17
18
# File 'lib/rom/event_store/dataset.rb', line 16

def from(id)
  __new__(from: id)
end

#limit(limit) ⇒ Object



20
21
22
# File 'lib/rom/event_store/dataset.rb', line 20

def limit(limit)
  __new__(limit: limit)
end

#select(aggregate) ⇒ Object



12
13
14
# File 'lib/rom/event_store/dataset.rb', line 12

def select(aggregate)
  __new__(aggregate: aggregate)
end

#streamObject



24
25
26
27
# File 'lib/rom/event_store/dataset.rb', line 24

def stream
  aggregate = @options[:aggregate]
  aggregate ? "#{category}-#{aggregate}" : "$ce-#{category}"
end

#subscribeObject



38
39
40
41
42
# File 'lib/rom/event_store/dataset.rb', line 38

def subscribe
  subscription = @connection.subscription(stream, @options)
  subscription.on_event { |event| yield(dehydrate(event)) }
  subscription.start
end