XUtils

lua-resty-kafka

Kafka client driver based on OpenResty cosockets.


fetch_metadata

syntax: brokers, partitions = c:fetch_metadata(topic)

In case of success, return the all brokers and partitions of the topic. In case of errors, returns nil with a string describing the error.

Back to TOC

refresh

syntax: brokers, partitions = c:refresh()

This will refresh the metadata of all topics which have been fetched by fetch_metadata. In case of success, return all brokers and all partitions of all topics. In case of errors, returns nil with a string describing the error.

Back to TOC

Methods

send

syntax: ok, err = p:send(topic, key, message)

  1. In sync model

    In case of success, returns the offset (** cdata: LL **) of the current broker and partition. In case of errors, returns nil with a string describing the error.

  2. In async model

    The message will write to the buffer first. It will send to the kafka server when the buffer exceed the batch_num, or every flush_time flush the buffer.

    It case of success, returns true. In case of errors, returns nil with a string describing the error (buffer overflow).

Back to TOC

offset

syntax: sum, details = p:offset()

Return the sum of all the topic-partition offset (return by the ProduceRequest api);
and the details of each topic-partition

Back to TOC

Methods

new

syntax: c = bconsumer:new(broker_list, client_config)

The broker_list is a list of broker, like the below

[
    {
        "host": "127.0.0.1",
        "port": 9092,

        // optional auth
        "sasl_config": {
            "mechanism": "PLAIN",
            "user": "USERNAME",
            "password": "PASSWORD"
        }
    }
]

An optional client_config table can be specified. The following options are as follows:

client config

Back to TOC

list_offset

syntax: offset, err = c:list_offset(topic, partition, timestamp)

The parameter timestamp can be a UNIX timestamp or a constant defined in resty.kafka.protocol.consumer, LIST_OFFSET_TIMESTAMP_LAST, LIST_OFFSET_TIMESTAMP_FIRST, LIST_OFFSET_TIMESTAMP_MAX, used to get the initial and latest offsets, etc., semantics with the ListOffsets API in Apache Kafka. See: https://kafka.apache.org/protocol.html#The_Messages_ListOffsets

In case of success, return the offset of the specified case. In case of errors, returns nil with a string describing the error.

Back to TOC


Articles

  • coming soon...