diff --git a/apisix/plugins/error-log-logger.lua b/apisix/plugins/error-log-logger.lua index ee530e14ad91..e79831829faf 100644 --- a/apisix/plugins/error-log-logger.lua +++ b/apisix/plugins/error-log-logger.lua @@ -108,6 +108,16 @@ local metadata_schema = { }, uniqueItems = true, }, + tls = { + type = "object", + description = "tls config for connecting to kafka brokers", + properties = { + verify = { + type = "boolean", + default = false, + }, + }, + }, kafka_topic = {type = "string"}, producer_type = { type = "string", @@ -382,6 +392,10 @@ local function send_to_kafka(log_message) broker_config["producer_type"] = config.kafka.producer_type broker_config["required_acks"] = config.kafka.required_acks broker_config["refresh_interval"] = config.kafka.meta_refresh_interval * 1000 + if config.kafka.tls then + broker_config["ssl"] = true + broker_config["ssl_verify"] = config.kafka.tls.verify + end -- reuse producer via kafka_prod_lrucache to avoid unbalanced partitions of messages in kafka local prod, err = kafka_prod_lrucache(plugin_name, metadata.modifiedIndex, diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index 845fa684b8ba..c52996f698b6 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -89,6 +89,16 @@ local schema = { }, uniqueItems = true, }, + tls = { + type = "object", + description = "tls config for connecting to kafka brokers", + properties = { + verify = { + type = "boolean", + default = false, + }, + }, + }, kafka_topic = {type = "string"}, producer_type = { type = "string", @@ -279,6 +289,10 @@ function _M.log(conf, ctx) broker_config["flush_time"] = conf.producer_time_linger * 1000 broker_config["refresh_interval"] = conf.meta_refresh_interval * 1000 broker_config["api_version"] = conf.api_version + if conf.tls then + broker_config["ssl"] = true + broker_config["ssl_verify"] = conf.tls.verify + end local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, create_producer, broker_list, broker_config, conf.cluster_name) diff --git a/docs/en/latest/plugins/error-log-logger.md b/docs/en/latest/plugins/error-log-logger.md index 2af9dac59d32..bd46666d6d6b 100644 --- a/docs/en/latest/plugins/error-log-logger.md +++ b/docs/en/latest/plugins/error-log-logger.md @@ -66,6 +66,8 @@ There are no attributes to configure this Plugin on Routes or Services. All conf | kafka.brokers[].sasl_config.mechanism | string | False | PLAIN | ["PLAIN"] | The mechanism of SASL configuration. | | kafka.brokers[].sasl_config.user | string | True | | | The user of SASL configuration. Required if `sasl_config` is present. | | kafka.brokers[].sasl_config.password | string | True | | | The password of SASL configuration. Required if `sasl_config` is present. | +| kafka.tls | object | False | | | TLS configuration for connecting to Kafka brokers. | +| kafka.tls.verify | boolean | False | false | | If true, verify the Kafka broker TLS certificate. | | kafka.kafka_topic | string | True | | | Target topic to push the logs for organization. | | kafka.producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. | | kafka.required_acks | integer | False | 1 | [-1, 0, 1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. | @@ -238,3 +240,32 @@ curl "http://127.0.0.1:9180/apisix/admin/plugin_metadata/error-log-logger" -X PU "inactive_timeout": 1 }' ``` + +### Send Logs to TLS-Enabled Kafka Brokers + +The following example demonstrates how to configure the `error-log-logger` Plugin to send error logs to TLS-enabled Kafka brokers. + +Configure the Plugin metadata with the Kafka broker details and TLS configuration: + +```shell +curl "http://127.0.0.1:9180/apisix/admin/plugin_metadata/error-log-logger" -X PUT \ + -H "X-API-KEY: ${admin_key}" \ + -d '{ + "kafka": { + "brokers": [ + { + "host": "kafka.example.com", + "port": 9093 + } + ], + "kafka_topic": "apisix-error-logs", + "tls": { + "verify": true + } + }, + "level": "ERROR", + "inactive_timeout": 1 + }' +``` + +When using self-signed certificates, set `tls.verify` to `false` to skip certificate verification. diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index f57081183822..22ecb3c3fb59 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -49,6 +49,8 @@ It might take some time to receive the log data. It will be automatically sent a | brokers.sasl_config.mechanism | string | False | "PLAIN" | ["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"] | The mechanism of SASL config. | | brokers.sasl_config.user | string | True | | | The user of `sasl_config`. Required if `sasl_config` is configured. | | brokers.sasl_config.password | string | True | | | The password of `sasl_config`. Required if `sasl_config` is configured. | +| tls | object | False | | | TLS configuration for connecting to Kafka brokers. | +| tls.verify | boolean | False | false | | If true, verify the Kafka broker TLS certificate. | | kafka_topic | string | True | | | Target topic to push the logs. | | producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. | | required_acks | integer | False | 1 | [1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. `required_acks` cannot be 0. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. | @@ -487,3 +489,50 @@ If you have customized the `log_format` in addition to setting `include_req_body ``` ::: + +### Log to TLS-Enabled Kafka Brokers + +The following example demonstrates how to connect to TLS-enabled Kafka brokers, such as AWS MSK. + +Create a Route with `kafka-logger` and configure the `tls` attribute to connect to the TLS-enabled Kafka broker: + +```shell +curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \ + -H "X-API-KEY: ${admin_key}" \ + -d '{ + "id": "kafka-logger-tls-route", + "uri": "/get", + "plugins": { + "kafka-logger": { + "brokers": [ + { + "host": "kafka.example.com", + "port": 9093 + } + ], + "kafka_topic": "test2", + "key": "key1", + "batch_max_size": 1, + "tls": { + "verify": true + } + } + }, + "upstream": { + "nodes": { + "httpbin.org:80": 1 + }, + "type": "roundrobin" + } + }' +``` + +When using self-signed certificates, set `tls.verify` to `false` to skip certificate verification: + +```json +{ + "tls": { + "verify": false + } +} +``` diff --git a/t/plugin/error-log-logger-kafka.t b/t/plugin/error-log-logger-kafka.t index 0ae6a52aca90..cb4ced00f888 100644 --- a/t/plugin/error-log-logger-kafka.t +++ b/t/plugin/error-log-logger-kafka.t @@ -270,3 +270,122 @@ true true --- no_error_log [alert] + + + +=== TEST 8: tls schema validation - valid tls config +--- config + location /t { + content_by_lua_block { + local core = require("apisix.core") + local plugin = require("apisix.plugins.error-log-logger") + local ok, err = plugin.check_schema( + { + kafka = { + brokers = { + { + host = "127.0.0.1", + port = 9093 + } + }, + kafka_topic = "test2", + tls = { verify = false } + } + }, + core.schema.TYPE_METADATA + ) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- response_body +done + + + +=== TEST 9: tls schema validation - wrong type for verify +--- config + location /t { + content_by_lua_block { + local core = require("apisix.core") + local plugin = require("apisix.plugins.error-log-logger") + local ok, err = plugin.check_schema( + { + kafka = { + brokers = { + { + host = "127.0.0.1", + port = 9093 + } + }, + kafka_topic = "test2", + tls = { verify = "abc" } + } + }, + core.schema.TYPE_METADATA + ) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- response_body +property "kafka" validation failed: property "tls" validation failed: property "verify" validation failed: wrong type: expected boolean, got string +done + + + +=== TEST 10: put plugin metadata with tls config and log an error level message +--- config + location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/plugin_metadata/error-log-logger', + ngx.HTTP_PUT, + [[{ + "kafka": { + "brokers": [{ + "host": "127.0.0.1", + "port": 9093 + }], + "producer_type": "sync", + "kafka_topic": "test2", + "tls": {"verify": false} + }, + "level": "ERROR", + "inactive_timeout": 1 + }]] + ) + ngx.sleep(2) + core.log.error("this is a error message for tls test.") + } + } +--- error_log eval +[qr/this is a error message for tls test/, +qr/send data to kafka: .*tls test/] +--- wait: 3 + + + +=== TEST 11: delete metadata for the plugin, recover to the default +--- config + location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/plugin_metadata/error-log-logger', + ngx.HTTP_DELETE) + + if code >= 300 then + ngx.status = code + end + + ngx.say(body) + } + } +--- response_body +passed diff --git a/t/plugin/kafka-logger-tls.t b/t/plugin/kafka-logger-tls.t new file mode 100644 index 000000000000..df86c31355d3 --- /dev/null +++ b/t/plugin/kafka-logger-tls.t @@ -0,0 +1,185 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_root_location(); + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->request) { + $block->set_value("request", "GET /t"); + } +}); + +add_block_preprocessor(sub { + my ($block) = @_; + + my $extra_init_by_lua = <<_EOC_; + local bp_manager = require("apisix.utils.batch-processor-manager") + local core = require("apisix.core") + local function log_send_data(entry) + local data = type(entry) == "table" and core.json.encode(entry) or entry + core.log.info("send data to kafka: ", data) + end + local old_add = bp_manager.add_entry + bp_manager.add_entry = function(self, conf, entry, max_pending_entries) + local ok = old_add(self, conf, entry, max_pending_entries) + if ok then + log_send_data(entry) + end + return ok + end + local old_new = bp_manager.add_entry_to_new_processor + bp_manager.add_entry_to_new_processor = function(self, conf, entry, ctx, func, max_pending_entries) + local ok = old_new(self, conf, entry, ctx, func, max_pending_entries) + if ok then + log_send_data(entry) + end + return ok + end +_EOC_ + + if (!defined $block->extra_init_by_lua) { + $block->set_value("extra_init_by_lua", $extra_init_by_lua); + } +}); + +run_tests; + +__DATA__ + +=== TEST 1: tls schema validation - valid tls config +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.kafka-logger") + local ok, err = plugin.check_schema({ + brokers = {{host = "127.0.0.1", port = 9093}}, + kafka_topic = "test", + tls = { verify = false } + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- response_body +done + + + +=== TEST 2: tls schema validation - without tls (backward compatibility) +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.kafka-logger") + local ok, err = plugin.check_schema({ + brokers = {{host = "127.0.0.1", port = 9092}}, + kafka_topic = "test" + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- response_body +done + + + +=== TEST 3: tls schema validation - wrong type for verify +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.kafka-logger") + local ok, err = plugin.check_schema({ + brokers = {{host = "127.0.0.1", port = 9093}}, + kafka_topic = "test", + tls = { verify = "abc" } + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- response_body +property "tls" validation failed: property "verify" validation failed: wrong type: expected boolean, got string +done + + + +=== TEST 4: set route with tls config to SSL port +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins":{ + "kafka-logger":{ + "brokers":[ + { + "host":"127.0.0.1", + "port":9093 + }], + "kafka_topic":"test2", + "producer_type":"sync", + "key":"key1", + "timeout":1, + "batch_max_size":1, + "include_req_body": true, + "tls":{"verify":false} + } + }, + "upstream":{ + "nodes":{ + "127.0.0.1:1980":1 + }, + "type":"roundrobin" + }, + "uri":"/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 5: hit route, send data to kafka via TLS successfully +--- request +POST /hello?name=qwerty +abcdef +--- response_body +hello world +--- error_log eval +qr/send data to kafka: \{.*"body":"abcdef"/ +--- no_error_log +[error] +--- wait: 2