Detailed documentation: https://crystaldoc.info/github/BT-OpenSource/crafka/main/index.html
Add this to your application's shard.yml:
dependencies:
crafka:
github: bt-opensource/crafkarequire "crafka"producer = Kafka::Producer.new({"bootstrap.servers" => "localhost:9092", "broker.address.family" => "v4"})
producer.produce(topic: "topic_name", payload: "my message".to_slice)
# Optionally
producer.poll # Serves queued callbacks
producer.flush # Wait for outstanding produce requests to completeAll available args to #produce: topic, payload, key, timestamp.
librdkafka recommends that rd_kafka_poll is called at regular intervals to serve queued callbacks. This functionality is built in to Crafka.
By default after each #produce, a Kafka::Producer will call poll if it hasn't polled in the last 5 seconds.
You can configure this with the poll_interval argument:
producer = Kafka::Producer.new(
{"bootstrap.servers" => "localhost:9092", "broker.address.family" => "v4"},
poll_interval: 30
)To disable auto polling, set poll_interval to 0.
To enable capturing of the statistics described here you can pass a stats_path argument to Kafka::Producer.new containing the location of a file to be written to.
Also ensure that you set the statistics.interval.ms in your producer config.
producer = Kafka::Producer.new(
{"bootstrap.servers" => "localhost:9092", "broker.address.family" => "v4", "statistics.interval.ms" => "5000"},
stats_path: "/some/directory/librdkafka_stats.json"
)consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9092", "broker.address.family" => "v4", "group.id" => "consumer_group_name"})
consumer.subscribe("topic_name")
consumer.each do |message|
# message is an instance of Kafka::Message
puts "#{String.new(message.topic)} -> #{String.new(message.payload)}"
end
consumer.closeconsumer.subscribe("topic_name", "another_topic", "more_and_more")
consumer.subscribe("^starts_with") # subscribe to multiple with a regexmake setup
crystal spec
- Update shard.yml and
src/crafka.crwith new version number - Update CHANGELOG.md with changes
- Commit and tag commit
Originally forked from: https://github.com/CloudKarafka/kafka.cr