From 63d67f74566d3453ef28af39a4b6366d900dec50 Mon Sep 17 00:00:00 2001 From: ecsimsw Date: Thu, 25 Jun 2026 10:37:13 +0900 Subject: [PATCH 1/5] feat(kafka-logger): add TLS support for Kafka brokers --- apisix/plugins/kafka-logger.lua | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index 845fa684b8ba..c52996f698b6 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -89,6 +89,16 @@ local schema = { }, uniqueItems = true, }, + tls = { + type = "object", + description = "tls config for connecting to kafka brokers", + properties = { + verify = { + type = "boolean", + default = false, + }, + }, + }, kafka_topic = {type = "string"}, producer_type = { type = "string", @@ -279,6 +289,10 @@ function _M.log(conf, ctx) broker_config["flush_time"] = conf.producer_time_linger * 1000 broker_config["refresh_interval"] = conf.meta_refresh_interval * 1000 broker_config["api_version"] = conf.api_version + if conf.tls then + broker_config["ssl"] = true + broker_config["ssl_verify"] = conf.tls.verify + end local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, create_producer, broker_list, broker_config, conf.cluster_name) From e6188df40aecd89dbfee8444ebdb9e7e559dc04c Mon Sep 17 00:00:00 2001 From: ecsimsw Date: Thu, 25 Jun 2026 10:37:46 +0900 Subject: [PATCH 2/5] feat(error-log-logger): add TLS support for Kafka brokers --- apisix/plugins/error-log-logger.lua | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/apisix/plugins/error-log-logger.lua b/apisix/plugins/error-log-logger.lua index ee530e14ad91..e79831829faf 100644 --- a/apisix/plugins/error-log-logger.lua +++ b/apisix/plugins/error-log-logger.lua @@ -108,6 +108,16 @@ local metadata_schema = { }, uniqueItems = true, }, + tls = { + type = "object", + description = "tls config for connecting to kafka brokers", + properties = { + verify = { + type = "boolean", + default = false, + }, + }, + }, kafka_topic = {type = "string"}, producer_type = { type = "string", @@ -382,6 +392,10 @@ local function send_to_kafka(log_message) broker_config["producer_type"] = config.kafka.producer_type broker_config["required_acks"] = config.kafka.required_acks broker_config["refresh_interval"] = config.kafka.meta_refresh_interval * 1000 + if config.kafka.tls then + broker_config["ssl"] = true + broker_config["ssl_verify"] = config.kafka.tls.verify + end -- reuse producer via kafka_prod_lrucache to avoid unbalanced partitions of messages in kafka local prod, err = kafka_prod_lrucache(plugin_name, metadata.modifiedIndex, From f65e419312b1b963fa79aadd335f55a8529116ec Mon Sep 17 00:00:00 2001 From: ecsimsw Date: Thu, 25 Jun 2026 11:44:28 +0900 Subject: [PATCH 3/5] test(kafka-logger): add TLS schema and integration tests --- t/plugin/kafka-logger-tls.t | 185 ++++++++++++++++++++++++++++++++++++ 1 file changed, 185 insertions(+) create mode 100644 t/plugin/kafka-logger-tls.t diff --git a/t/plugin/kafka-logger-tls.t b/t/plugin/kafka-logger-tls.t new file mode 100644 index 000000000000..df86c31355d3 --- /dev/null +++ b/t/plugin/kafka-logger-tls.t @@ -0,0 +1,185 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_root_location(); + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->request) { + $block->set_value("request", "GET /t"); + } +}); + +add_block_preprocessor(sub { + my ($block) = @_; + + my $extra_init_by_lua = <<_EOC_; + local bp_manager = require("apisix.utils.batch-processor-manager") + local core = require("apisix.core") + local function log_send_data(entry) + local data = type(entry) == "table" and core.json.encode(entry) or entry + core.log.info("send data to kafka: ", data) + end + local old_add = bp_manager.add_entry + bp_manager.add_entry = function(self, conf, entry, max_pending_entries) + local ok = old_add(self, conf, entry, max_pending_entries) + if ok then + log_send_data(entry) + end + return ok + end + local old_new = bp_manager.add_entry_to_new_processor + bp_manager.add_entry_to_new_processor = function(self, conf, entry, ctx, func, max_pending_entries) + local ok = old_new(self, conf, entry, ctx, func, max_pending_entries) + if ok then + log_send_data(entry) + end + return ok + end +_EOC_ + + if (!defined $block->extra_init_by_lua) { + $block->set_value("extra_init_by_lua", $extra_init_by_lua); + } +}); + +run_tests; + +__DATA__ + +=== TEST 1: tls schema validation - valid tls config +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.kafka-logger") + local ok, err = plugin.check_schema({ + brokers = {{host = "127.0.0.1", port = 9093}}, + kafka_topic = "test", + tls = { verify = false } + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- response_body +done + + + +=== TEST 2: tls schema validation - without tls (backward compatibility) +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.kafka-logger") + local ok, err = plugin.check_schema({ + brokers = {{host = "127.0.0.1", port = 9092}}, + kafka_topic = "test" + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- response_body +done + + + +=== TEST 3: tls schema validation - wrong type for verify +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.kafka-logger") + local ok, err = plugin.check_schema({ + brokers = {{host = "127.0.0.1", port = 9093}}, + kafka_topic = "test", + tls = { verify = "abc" } + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- response_body +property "tls" validation failed: property "verify" validation failed: wrong type: expected boolean, got string +done + + + +=== TEST 4: set route with tls config to SSL port +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins":{ + "kafka-logger":{ + "brokers":[ + { + "host":"127.0.0.1", + "port":9093 + }], + "kafka_topic":"test2", + "producer_type":"sync", + "key":"key1", + "timeout":1, + "batch_max_size":1, + "include_req_body": true, + "tls":{"verify":false} + } + }, + "upstream":{ + "nodes":{ + "127.0.0.1:1980":1 + }, + "type":"roundrobin" + }, + "uri":"/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 5: hit route, send data to kafka via TLS successfully +--- request +POST /hello?name=qwerty +abcdef +--- response_body +hello world +--- error_log eval +qr/send data to kafka: \{.*"body":"abcdef"/ +--- no_error_log +[error] +--- wait: 2 From bcaa0f68a67aa6efeb110b6d217eaba0db93adc5 Mon Sep 17 00:00:00 2001 From: ecsimsw Date: Thu, 25 Jun 2026 11:46:34 +0900 Subject: [PATCH 4/5] test(error-log-logger): add TLS schema and integration tests for Kafka --- t/plugin/error-log-logger-kafka.t | 119 ++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/t/plugin/error-log-logger-kafka.t b/t/plugin/error-log-logger-kafka.t index 0ae6a52aca90..cb4ced00f888 100644 --- a/t/plugin/error-log-logger-kafka.t +++ b/t/plugin/error-log-logger-kafka.t @@ -270,3 +270,122 @@ true true --- no_error_log [alert] + + + +=== TEST 8: tls schema validation - valid tls config +--- config + location /t { + content_by_lua_block { + local core = require("apisix.core") + local plugin = require("apisix.plugins.error-log-logger") + local ok, err = plugin.check_schema( + { + kafka = { + brokers = { + { + host = "127.0.0.1", + port = 9093 + } + }, + kafka_topic = "test2", + tls = { verify = false } + } + }, + core.schema.TYPE_METADATA + ) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- response_body +done + + + +=== TEST 9: tls schema validation - wrong type for verify +--- config + location /t { + content_by_lua_block { + local core = require("apisix.core") + local plugin = require("apisix.plugins.error-log-logger") + local ok, err = plugin.check_schema( + { + kafka = { + brokers = { + { + host = "127.0.0.1", + port = 9093 + } + }, + kafka_topic = "test2", + tls = { verify = "abc" } + } + }, + core.schema.TYPE_METADATA + ) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- response_body +property "kafka" validation failed: property "tls" validation failed: property "verify" validation failed: wrong type: expected boolean, got string +done + + + +=== TEST 10: put plugin metadata with tls config and log an error level message +--- config + location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/plugin_metadata/error-log-logger', + ngx.HTTP_PUT, + [[{ + "kafka": { + "brokers": [{ + "host": "127.0.0.1", + "port": 9093 + }], + "producer_type": "sync", + "kafka_topic": "test2", + "tls": {"verify": false} + }, + "level": "ERROR", + "inactive_timeout": 1 + }]] + ) + ngx.sleep(2) + core.log.error("this is a error message for tls test.") + } + } +--- error_log eval +[qr/this is a error message for tls test/, +qr/send data to kafka: .*tls test/] +--- wait: 3 + + + +=== TEST 11: delete metadata for the plugin, recover to the default +--- config + location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/plugin_metadata/error-log-logger', + ngx.HTTP_DELETE) + + if code >= 300 then + ngx.status = code + end + + ngx.say(body) + } + } +--- response_body +passed From a1d30c3fc0b795efe357f453d2b447772e64462d Mon Sep 17 00:00:00 2001 From: ecsimsw Date: Thu, 25 Jun 2026 12:05:05 +0900 Subject: [PATCH 5/5] docs: add TLS attributes and examples for kafka-logger and error-log-logger --- docs/en/latest/plugins/error-log-logger.md | 31 ++++++++++++++ docs/en/latest/plugins/kafka-logger.md | 49 ++++++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/docs/en/latest/plugins/error-log-logger.md b/docs/en/latest/plugins/error-log-logger.md index 2af9dac59d32..bd46666d6d6b 100644 --- a/docs/en/latest/plugins/error-log-logger.md +++ b/docs/en/latest/plugins/error-log-logger.md @@ -66,6 +66,8 @@ There are no attributes to configure this Plugin on Routes or Services. All conf | kafka.brokers[].sasl_config.mechanism | string | False | PLAIN | ["PLAIN"] | The mechanism of SASL configuration. | | kafka.brokers[].sasl_config.user | string | True | | | The user of SASL configuration. Required if `sasl_config` is present. | | kafka.brokers[].sasl_config.password | string | True | | | The password of SASL configuration. Required if `sasl_config` is present. | +| kafka.tls | object | False | | | TLS configuration for connecting to Kafka brokers. | +| kafka.tls.verify | boolean | False | false | | If true, verify the Kafka broker TLS certificate. | | kafka.kafka_topic | string | True | | | Target topic to push the logs for organization. | | kafka.producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. | | kafka.required_acks | integer | False | 1 | [-1, 0, 1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. | @@ -238,3 +240,32 @@ curl "http://127.0.0.1:9180/apisix/admin/plugin_metadata/error-log-logger" -X PU "inactive_timeout": 1 }' ``` + +### Send Logs to TLS-Enabled Kafka Brokers + +The following example demonstrates how to configure the `error-log-logger` Plugin to send error logs to TLS-enabled Kafka brokers. + +Configure the Plugin metadata with the Kafka broker details and TLS configuration: + +```shell +curl "http://127.0.0.1:9180/apisix/admin/plugin_metadata/error-log-logger" -X PUT \ + -H "X-API-KEY: ${admin_key}" \ + -d '{ + "kafka": { + "brokers": [ + { + "host": "kafka.example.com", + "port": 9093 + } + ], + "kafka_topic": "apisix-error-logs", + "tls": { + "verify": true + } + }, + "level": "ERROR", + "inactive_timeout": 1 + }' +``` + +When using self-signed certificates, set `tls.verify` to `false` to skip certificate verification. diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index f57081183822..22ecb3c3fb59 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -49,6 +49,8 @@ It might take some time to receive the log data. It will be automatically sent a | brokers.sasl_config.mechanism | string | False | "PLAIN" | ["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"] | The mechanism of SASL config. | | brokers.sasl_config.user | string | True | | | The user of `sasl_config`. Required if `sasl_config` is configured. | | brokers.sasl_config.password | string | True | | | The password of `sasl_config`. Required if `sasl_config` is configured. | +| tls | object | False | | | TLS configuration for connecting to Kafka brokers. | +| tls.verify | boolean | False | false | | If true, verify the Kafka broker TLS certificate. | | kafka_topic | string | True | | | Target topic to push the logs. | | producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. | | required_acks | integer | False | 1 | [1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. `required_acks` cannot be 0. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. | @@ -487,3 +489,50 @@ If you have customized the `log_format` in addition to setting `include_req_body ``` ::: + +### Log to TLS-Enabled Kafka Brokers + +The following example demonstrates how to connect to TLS-enabled Kafka brokers, such as AWS MSK. + +Create a Route with `kafka-logger` and configure the `tls` attribute to connect to the TLS-enabled Kafka broker: + +```shell +curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \ + -H "X-API-KEY: ${admin_key}" \ + -d '{ + "id": "kafka-logger-tls-route", + "uri": "/get", + "plugins": { + "kafka-logger": { + "brokers": [ + { + "host": "kafka.example.com", + "port": 9093 + } + ], + "kafka_topic": "test2", + "key": "key1", + "batch_max_size": 1, + "tls": { + "verify": true + } + } + }, + "upstream": { + "nodes": { + "httpbin.org:80": 1 + }, + "type": "roundrobin" + } + }' +``` + +When using self-signed certificates, set `tls.verify` to `false` to skip certificate verification: + +```json +{ + "tls": { + "verify": false + } +} +```