From 016ae0524f79278d039789f1e93a9749cef58c55 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 24 Aug 2023 11:14:02 -0300 Subject: [PATCH] fix(aeh_producer): remove timestamp template field MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes https://emqx.atlassian.net/browse/EMQX-10847 Checking the whole Kafka message from AEH, it seems like the timestamp type is append, which means that it’s the broker who controls the timestamp, and the timestamp defined by the producer is ignored. Ref: https://kafka.apache.org/documentation/#brokerconfigs_log.message.timestamp.type Example message consumed from AEH: ``` %{ "headers" => %{}, "key" => "", "offset" => 4, "topic" => "test0", "ts" => 1692879703006, "ts_type" => "append", "value" => "{\"username\":\"undefined\",\"topic\":\"t/aeh/produ\",\"timestamp\":1692879692189,\"qos\":0,\"publish_received_at\":1692879692189,\"pub_props\":{\"User-Property\":{}},\"peerhost\":\"undefined\",\"payload\":\"aaaa\",\"node\":\"emqx@127.0.0.1\",\"metadata\":{\"rule_id\":\"rule_aehp\"},\"id\":\"000603AA44B34E08F4AF000006E30003\",\"flags\":{},\"event\":\"message.publish\",\"clientid\":\"undefined\"}" } ``` Note the ts_type above is append. Example message from a Kafka broker whose ts type is create: ``` %{ "headers" => %{}, "key" => "", "offset" => 4, "topic" => "test-topic-three-partitions", "ts" => 1692881883668, "ts_type" => "create", "value" => "{\"username\":\"undefined\",\"topic\":\"t/kafka/produ\",\"timestamp\":1692881883668,\"qos\":0,\"publish_received_at\":1692881883668,\"pub_props\":{\"User-Property\":{}},\"peerhost\":\"undefined\",\"payload\":\"aaaaaa\",\"node\":\"emqx@127.0.0.1\",\"id\":\"000603AAC7529FEEF4AC000007050000\",\"flags\":{},\"event\":\"message.publish\",\"clientid\":\"undefined\"}" } ``` Unfortunately, I couldn’t find anywhere in AEH where that configuration could be changed. --- .../src/emqx_bridge_azure_event_hub.app.src | 2 +- .../src/emqx_bridge_azure_event_hub.erl | 6 ++++++ .../test/emqx_bridge_azure_event_hub_producer_SUITE.erl | 5 ----- .../test/emqx_bridge_azure_event_hub_tests.erl | 1 - 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src index e29e9c83a..43033b657 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_azure_event_hub, [ {description, "EMQX Enterprise Azure Event Hub Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl index 2d6343b74..c563a35d8 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl @@ -64,6 +64,10 @@ fields(producer_kafka_opts) -> kafka_producer_overrides() ), override_documentations(Fields); +fields(kafka_message) -> + Fields0 = emqx_bridge_kafka:fields(kafka_message), + Fields = proplists:delete(timestamp, Fields0), + override_documentations(Fields); fields(Method) -> Fields = emqx_bridge_kafka:fields(Method), override_documentations(Fields). @@ -85,6 +89,7 @@ desc(Name) -> struct_names() -> [ auth_username_password, + kafka_message, producer_kafka_opts ]. @@ -245,6 +250,7 @@ kafka_producer_overrides() -> default => no_compression, importance => ?IMPORTANCE_HIDDEN }), + message => mk(ref(kafka_message), #{}), required_acks => mk(enum([all_isr, leader_only]), #{default => all_isr}) }. diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl index af4b87718..e77d724d0 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl @@ -65,10 +65,6 @@ init_per_suite(Config) -> end. end_per_suite(Config) -> - %% emqx_mgmt_api_test_util:end_suite(), - %% ok = emqx_common_test_helpers:stop_apps([emqx_conf]), - %% ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]), - %% _ = application:stop(emqx_connector), Apps = ?config(tc_apps, Config), emqx_cth_suite:stop(Apps), ok. @@ -145,7 +141,6 @@ bridge_config(TestCase, Config) -> <<"message">> => #{ <<"key">> => <<"${.clientid}">>, - <<"timestamp">> => <<"${.timestamp}">>, <<"value">> => <<"${.}">> }, <<"partition_count_refresh_interval">> => <<"60s">>, diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_tests.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_tests.erl index d624421c6..92d268d20 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_tests.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_tests.erl @@ -33,7 +33,6 @@ bridges.azure_event_hub_producer.my_producer { max_inflight = 10 message { key = \"${.clientid}\" - timestamp = \"${.timestamp}\" value = \"${.}\" } partition_count_refresh_interval = 60s