Class: ROM::Kafka::Connection::Consumer Private

Inherits:
ROM::Kafka::Connection show all
Includes:
Enumerable
Defined in:
lib/rom/kafka/connection/consumer.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

The consumer-specific connection to Kafka cluster

It is wrapped around `Poseidon::Consumer` driver, and responsible for adopting poseidon API to ROM::Dataset via [#initializer] and [#each] methods.

ROM::Kafka consumer deals with tuples, hiding poseidon-specific implementation of fetched messages from the rest of the gem.

Constant Summary collapse

DRIVER =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

The 'poseidon'-specific class describing consumers

Returns:

  • (Class)
Poseidon::PartitionConsumer

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ Consumer

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Initializes a consumer connection

The initializer is attributes-agnostic. This means it doesn't validate attributes, but skips unused.

@todo: refactor usinng factory method Connection.build_consumer

Parameters:

  • opts (Hash)

    a customizable set of options

Options Hash (opts):

  • :client_id (#to_s)

    A required unique id used to indentify the Kafka client.

  • :brokers (Array<String>)

    A list of seed brokers to find a lead broker to fetch messages from.

  • :topic (String)

    A name of the topic to fetch messages from.

  • :partition (Integer)

    A number of partition to fetch messages from.

  • :offset (Integer)

    An initial offset to start fetching from.

  • :min_bytes (Integer) — default: 1

    A smallest amount of data the server should send. (By default send us data as soon as it is ready).

  • :max_bytes (Integer) — default: 1_048_576

    A maximum number of bytes to fetch by consumer (1MB by default).

  • :max_wait_ms (Integer) — default: 100

    How long to block until the server sends data. NOTE: This is only enforced if min_bytes is > 0.



59
60
61
62
63
64
# File 'lib/rom/kafka/connection/consumer.rb', line 59

def initialize(opts)
  super # takes declared attributes from options
  args = opts.values_at(:client_id, :brokers, :topic, :partition, :offset)
  @connection = DRIVER.consumer_for_partition(*args, attributes)
  @mutex = Mutex.new
end

Instance Attribute Details

#connectionROM::Kafka::Connection::Consumer::DRIVER (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns driver to Kafka.

Returns:



32
33
34
# File 'lib/rom/kafka/connection/consumer.rb', line 32

def connection
  @connection
end

Instance Method Details

#each {|tuple| ... } ⇒ Enumerator<Array<Hash{Symbol => String, Integer}>>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Iterates through Kafka messages

Fetches the next portion of messages until no messages given

Yield Parameters:

  • tuple (Hash{Symbol => String, Integer})

Returns:

  • (Enumerator<Array<Hash{Symbol => String, Integer}>>)


82
83
84
85
86
87
88
89
# File 'lib/rom/kafka/connection/consumer.rb', line 82

def each
  return to_enum unless block_given?
  loop do
    tuples = fetch
    break unless tuples.any?
    tuples.each { |tuple| yield(tuple) }
  end
end

#fetchArray<Hash{Symbol => String, Integer}>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Fetches a single portion of messages and converts them to tuple

Returns:

  • (Array<Hash{Symbol => String, Integer}>)


70
71
72
# File 'lib/rom/kafka/connection/consumer.rb', line 70

def fetch
  @mutex.synchronize { @connection.fetch }.map(&method(:tuple))
end