Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions apisix/plugins/error-log-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,16 @@ local metadata_schema = {
},
uniqueItems = true,
},
tls = {
type = "object",
description = "tls config for connecting to kafka brokers",
properties = {
verify = {
type = "boolean",
default = false,
},
},
},
kafka_topic = {type = "string"},
producer_type = {
type = "string",
Expand Down Expand Up @@ -382,6 +392,10 @@ local function send_to_kafka(log_message)
broker_config["producer_type"] = config.kafka.producer_type
broker_config["required_acks"] = config.kafka.required_acks
broker_config["refresh_interval"] = config.kafka.meta_refresh_interval * 1000
if config.kafka.tls then
broker_config["ssl"] = true
broker_config["ssl_verify"] = config.kafka.tls.verify
end

-- reuse producer via kafka_prod_lrucache to avoid unbalanced partitions of messages in kafka
local prod, err = kafka_prod_lrucache(plugin_name, metadata.modifiedIndex,
Expand Down
14 changes: 14 additions & 0 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ local schema = {
},
uniqueItems = true,
},
tls = {
type = "object",
description = "tls config for connecting to kafka brokers",
properties = {
verify = {
type = "boolean",
default = false,
},
},
},
kafka_topic = {type = "string"},
producer_type = {
type = "string",
Expand Down Expand Up @@ -279,6 +289,10 @@ function _M.log(conf, ctx)
broker_config["flush_time"] = conf.producer_time_linger * 1000
broker_config["refresh_interval"] = conf.meta_refresh_interval * 1000
broker_config["api_version"] = conf.api_version
if conf.tls then
broker_config["ssl"] = true
broker_config["ssl_verify"] = conf.tls.verify
end

local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, create_producer,
broker_list, broker_config, conf.cluster_name)
Expand Down
31 changes: 31 additions & 0 deletions docs/en/latest/plugins/error-log-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ There are no attributes to configure this Plugin on Routes or Services. All conf
| kafka.brokers[].sasl_config.mechanism | string | False | PLAIN | ["PLAIN"] | The mechanism of SASL configuration. |
| kafka.brokers[].sasl_config.user | string | True | | | The user of SASL configuration. Required if `sasl_config` is present. |
| kafka.brokers[].sasl_config.password | string | True | | | The password of SASL configuration. Required if `sasl_config` is present. |
| kafka.tls | object | False | | | TLS configuration for connecting to Kafka brokers. |
| kafka.tls.verify | boolean | False | false | | If true, verify the Kafka broker TLS certificate. |
| kafka.kafka_topic | string | True | | | Target topic to push the logs for organization. |
| kafka.producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. |
| kafka.required_acks | integer | False | 1 | [-1, 0, 1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. |
Expand Down Expand Up @@ -238,3 +240,32 @@ curl "http://127.0.0.1:9180/apisix/admin/plugin_metadata/error-log-logger" -X PU
"inactive_timeout": 1
}'
```

### Send Logs to TLS-Enabled Kafka Brokers

The following example demonstrates how to configure the `error-log-logger` Plugin to send error logs to TLS-enabled Kafka brokers.

Configure the Plugin metadata with the Kafka broker details and TLS configuration:

```shell
curl "http://127.0.0.1:9180/apisix/admin/plugin_metadata/error-log-logger" -X PUT \
-H "X-API-KEY: ${admin_key}" \
-d '{
"kafka": {
"brokers": [
{
"host": "kafka.example.com",
"port": 9093
}
],
"kafka_topic": "apisix-error-logs",
"tls": {
"verify": true
}
},
"level": "ERROR",
"inactive_timeout": 1
}'
```

When using self-signed certificates, set `tls.verify` to `false` to skip certificate verification.
49 changes: 49 additions & 0 deletions docs/en/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ It might take some time to receive the log data. It will be automatically sent a
| brokers.sasl_config.mechanism | string | False | "PLAIN" | ["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"] | The mechanism of SASL config. |
| brokers.sasl_config.user | string | True | | | The user of `sasl_config`. Required if `sasl_config` is configured. |
| brokers.sasl_config.password | string | True | | | The password of `sasl_config`. Required if `sasl_config` is configured. |
| tls | object | False | | | TLS configuration for connecting to Kafka brokers. |
| tls.verify | boolean | False | false | | If true, verify the Kafka broker TLS certificate. |
| kafka_topic | string | True | | | Target topic to push the logs. |
| producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. |
| required_acks | integer | False | 1 | [1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. `required_acks` cannot be 0. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. |
Expand Down Expand Up @@ -487,3 +489,50 @@ If you have customized the `log_format` in addition to setting `include_req_body
```

:::

### Log to TLS-Enabled Kafka Brokers

The following example demonstrates how to connect to TLS-enabled Kafka brokers, such as AWS MSK.

Create a Route with `kafka-logger` and configure the `tls` attribute to connect to the TLS-enabled Kafka broker:

```shell
curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \
-H "X-API-KEY: ${admin_key}" \
-d '{
"id": "kafka-logger-tls-route",
"uri": "/get",
"plugins": {
"kafka-logger": {
"brokers": [
{
"host": "kafka.example.com",
"port": 9093
}
],
"kafka_topic": "test2",
"key": "key1",
"batch_max_size": 1,
"tls": {
"verify": true
}
}
},
"upstream": {
"nodes": {
"httpbin.org:80": 1
},
"type": "roundrobin"
}
}'
```

When using self-signed certificates, set `tls.verify` to `false` to skip certificate verification:

```json
{
"tls": {
"verify": false
}
}
```
119 changes: 119 additions & 0 deletions t/plugin/error-log-logger-kafka.t
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,122 @@ true
true
--- no_error_log
[alert]



=== TEST 8: tls schema validation - valid tls config
--- config
location /t {
content_by_lua_block {
local core = require("apisix.core")
local plugin = require("apisix.plugins.error-log-logger")
local ok, err = plugin.check_schema(
{
kafka = {
brokers = {
{
host = "127.0.0.1",
port = 9093
}
},
kafka_topic = "test2",
tls = { verify = false }
}
},
core.schema.TYPE_METADATA
)
if not ok then
ngx.say(err)
end
ngx.say("done")
}
}
--- response_body
done



=== TEST 9: tls schema validation - wrong type for verify
--- config
location /t {
content_by_lua_block {
local core = require("apisix.core")
local plugin = require("apisix.plugins.error-log-logger")
local ok, err = plugin.check_schema(
{
kafka = {
brokers = {
{
host = "127.0.0.1",
port = 9093
}
},
kafka_topic = "test2",
tls = { verify = "abc" }
}
},
core.schema.TYPE_METADATA
)
if not ok then
ngx.say(err)
end
ngx.say("done")
}
}
--- response_body
property "kafka" validation failed: property "tls" validation failed: property "verify" validation failed: wrong type: expected boolean, got string
done



=== TEST 10: put plugin metadata with tls config and log an error level message
--- config
location /t {
content_by_lua_block {
local core = require("apisix.core")
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/plugin_metadata/error-log-logger',
ngx.HTTP_PUT,
[[{
"kafka": {
"brokers": [{
"host": "127.0.0.1",
"port": 9093
}],
"producer_type": "sync",
"kafka_topic": "test2",
"tls": {"verify": false}
},
"level": "ERROR",
"inactive_timeout": 1
}]]
)
ngx.sleep(2)
core.log.error("this is a error message for tls test.")
}
}
--- error_log eval
[qr/this is a error message for tls test/,
qr/send data to kafka: .*tls test/]
--- wait: 3



=== TEST 11: delete metadata for the plugin, recover to the default
--- config
location /t {
content_by_lua_block {
local core = require("apisix.core")
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/plugin_metadata/error-log-logger',
ngx.HTTP_DELETE)

if code >= 300 then
ngx.status = code
end

ngx.say(body)
}
}
--- response_body
passed
Loading
Loading