Class: ROM::Kafka::Connection::Producer Private
- Inherits:
-
ROM::Kafka::Connection
- Object
- ROM::Kafka::Connection
- ROM::Kafka::Connection::Producer
- Defined in:
- lib/rom/kafka/connection/producer.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
The producer-specific connection to Kafka cluster
It is wrapped around `Poseidon::Producer` driver, and responsible for adopting poseidon API to ROM::Gateway via [#initializer] and [#publish] methods.
ROM::Kafka producer deals with tuples, hiding poseidon-specific implementation of messages from the rest of the gem.
Constant Summary collapse
- DRIVER =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
The 'poseidon' class describing a producer
Poseidon::Producer
- MESSAGE =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
The 'poseidon' class describing a message acceptable by producer
Poseidon::MessageToSend
Instance Attribute Summary collapse
-
#connection ⇒ ROM::Kafka::Connections::Producer::DRIVER
readonly
private
Driver to Kafka.
Instance Method Summary collapse
-
#initialize(options) ⇒ Producer
constructor
private
Initializes a producer connection.
-
#publish(*data) ⇒ Array<Hash{Symbol => String, nil}>
private
Sends tuples to the underlying connection.
Constructor Details
#initialize(options) ⇒ Producer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Initializes a producer connection
The initializer is attributes-agnostic. This means it doesn't validate attributes, but skips unused.
72 73 74 75 76 77 78 |
# File 'lib/rom/kafka/connection/producer.rb', line 72 def initialize() # @todo: Refactor using factory method Connection.build_producer super # takes declared attributes only, skipping brokers and client_id brokers = .fetch(:brokers) client = .fetch(:client_id) @connection = DRIVER.new(brokers, client, attributes) @mutex = Mutex.new end |
Instance Attribute Details
#connection ⇒ ROM::Kafka::Connections::Producer::DRIVER (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns driver to Kafka.
43 44 45 |
# File 'lib/rom/kafka/connection/producer.rb', line 43 def connection @connection end |
Instance Method Details
#publish(*data) ⇒ Array<Hash{Symbol => String, nil}>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Sends tuples to the underlying connection
Stringifies non-empty hash values to conform to 'poseidon' API.
89 90 91 92 93 94 |
# File 'lib/rom/kafka/connection/producer.rb', line 89 def publish(*data) tuples = data.flatten.map(&method(:stringify_keys)) = tuples.map(&method(:message)) @mutex.synchronize { @connection. } tuples end |