fetch_metadata
syntax: brokers, partitions = c:fetch_metadata(topic)
In case of success, return the all brokers and partitions of the topic
.
In case of errors, returns nil
with a string describing the error.
refresh
syntax: brokers, partitions = c:refresh()
This will refresh the metadata of all topics which have been fetched by fetch_metadata
.
In case of success, return all brokers and all partitions of all topics.
In case of errors, returns nil
with a string describing the error.
Methods
send
syntax: ok, err = p:send(topic, key, message)
In sync model
In case of success, returns the offset (** cdata: LL **) of the current broker and partition. In case of errors, returns
nil
with a string describing the error.In async model
The
message
will write to the buffer first. It will send to the kafka server when the buffer exceed thebatch_num
, or everyflush_time
flush the buffer.It case of success, returns
true
. In case of errors, returnsnil
with a string describing the error (buffer overflow
).
offset
syntax: sum, details = p:offset()
Return the sum of all the topic-partition offset (return by the ProduceRequest api);
and the details of each topic-partition
Methods
new
syntax: c = bconsumer:new(broker_list, client_config)
The broker_list
is a list of broker, like the below
[
{
"host": "127.0.0.1",
"port": 9092,
// optional auth
"sasl_config": {
"mechanism": "PLAIN",
"user": "USERNAME",
"password": "PASSWORD"
}
}
]
An optional client_config
table can be specified. The following options are as follows:
client config
socket_timeout
Specifies the network timeout threshold in milliseconds. SHOULD lagrer than the
request_timeout
.keepalive_timeout
Specifies the maximal idle timeout (in milliseconds) for the keepalive connection.
keepalive_size
Specifies the maximal number of connections allowed in the connection pool for per Nginx worker.
refresh_interval
Specifies the time to auto refresh the metadata in milliseconds. Then metadata will not auto refresh if is nil.
ssl
Specifies if client should use ssl connection. Defaults to false. See: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
ssl_verify
Specifies if client should perform SSL verification. Defaults to false. See: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
isolation_level
This setting controls the visibility of transactional records. See: https://kafka.apache.org/protocol.htmlclient_rack
Rack ID of the consumer making this request. See: https://kafka.apache.org/protocol.html
list_offset
syntax: offset, err = c:list_offset(topic, partition, timestamp)
The parameter timestamp can be a UNIX timestamp or a constant defined in resty.kafka.protocol.consumer
, LIST_OFFSET_TIMESTAMP_LAST
, LIST_OFFSET_TIMESTAMP_FIRST
, LIST_OFFSET_TIMESTAMP_MAX
, used to get the initial and latest offsets, etc., semantics with the ListOffsets API in Apache Kafka. See: https://kafka.apache.org/protocol.html#The_Messages_ListOffsets
In case of success, return the offset of the specified case.
In case of errors, returns nil
with a string describing the error.