Class: ROM::Kafka::Commands::Create

Inherits:
Commands::Create
  • Object
show all
Defined in:
lib/rom/kafka/create.rb

Overview

The Kafka-specific implementation of ROM::Commands::Create

Examples:

ROM.use(:auto_registration)
ROM.setup(:kafka, "localhost:9092")

class Users < ROM::Relation[:kafka]
  dataset :users
end

class GreetUsers < ROM::Commands::Create[:kafka]
  relation :users
  register_as :greet
end

rom = ROM.finalize.env
greet = rom.commands(:users).greet
greet.with(key: "greetings").call "Hi!"
# => [{ value: "Hi!", topic: "users", key: "greetings" }]

Instance Method Summary collapse

Instance Method Details

#execute(*messages) ⇒ Array<Hash>

Sends messages to the current topic/partition of Kafka

Parameters:

  • messages (#to_s, Array<#to_s>)

Returns:

  • (Array<Hash>)


36
37
38
39
# File 'lib/rom/kafka/create.rb', line 36

def execute(*messages)
  tuples = messages.flatten.map(&method(:tuple))
  producer.publish(*tuples)
end

#with(options) ⇒ ROM::Kafka::Commands::Create

Returns a new command where `:key` option is updated

Parameters:

  • options (Hash)

Returns:



49
50
51
# File 'lib/rom/kafka/create.rb', line 49

def with(options)
  self.class.new relation, key: options.fetch(:key)
end