Class: ROM::Kafka::Gateway
- Inherits:
-
Gateway
- Object
- Gateway
- ROM::Kafka::Gateway
- Extended by:
- AttributesDSL
- Defined in:
- lib/rom/kafka/gateway.rb
Overview
Describes the gateway to Kafka
The gateway has 3 responsibilities:
-
registers the datasets describing various topics and partitions
-
instantiates the producer connection to Kafka brokers that doesn't depend on a specific topic/partition settings
-
stores settings for the consumer connections to Kafka, that depends on a specific topic/partition/offset
Every dataset uses the same producer connection (defined by gateway) and individual consumer's one. The consumer's connection is reloaded every time the topic, partition or current offset is changed by a relation.
Instance Attribute Summary collapse
-
#producer ⇒ ROM::Kafka::Producer
readonly
The producer's connection to Kafka brockers.
Instance Method Summary collapse
-
#[](topic) ⇒ ROM::Kafka::Dataset
Returns the registered dataset by topic.
-
#dataset(topic) ⇒ self
Registers the dataset by topic.
-
#dataset?(topic) ⇒ Boolean
Checks whether a dataset is registered by topic.
-
#initialize(*addresses) ⇒ Gateway
constructor
Initializes the gateway to Kafka broker(s).
Constructor Details
#initialize(*addresses) ⇒ Gateway
Initializes the gateway to Kafka broker(s).
The initializer is attributes-agnostic. This means it doesn't validate attributes, but skips unused.
112 113 114 115 116 117 118 119 |
# File 'lib/rom/kafka/gateway.rb', line 112 def initialize(*addresses) # @todo: refactor the fat initializer = Hash[addresses.pop] brokers = Brokers.new(addresses, ).to_a # @todo: refactor using a factory super .merge(brokers: brokers) # prepares #attributes @producer = Connection::Producer.new(attributes) # @todo: refactor using a factory @datasets = {} end |
Instance Attribute Details
#producer ⇒ ROM::Kafka::Producer (readonly)
Returns the producer's connection to Kafka brockers.
125 126 127 |
# File 'lib/rom/kafka/gateway.rb', line 125 def producer @producer end |
Instance Method Details
#[](topic) ⇒ ROM::Kafka::Dataset
Returns the registered dataset by topic
133 134 135 |
# File 'lib/rom/kafka/gateway.rb', line 133 def [](topic) @datasets[topic.to_sym] end |
#dataset(topic) ⇒ self
Registers the dataset by topic
By default the dataset is registered with 0 partition and 0 offset. That settings can be changed from either relation of a command.
146 147 148 |
# File 'lib/rom/kafka/gateway.rb', line 146 def dataset(topic) @datasets[topic.to_sym] ||= Dataset.new(self, topic) end |
#dataset?(topic) ⇒ Boolean
Checks whether a dataset is registered by topic
156 157 158 |
# File 'lib/rom/kafka/gateway.rb', line 156 def dataset?(topic) self[topic] ? true : false end |