From 59e4db98f7ff118a1b551d8f7b86b551943dd8b5 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 16 Feb 2024 12:11:45 +0100 Subject: [PATCH 1/2] test(bridge_mqtt): Stop snabbkaffe servers --- .../test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl index 62e0e4f51..b9097b9c3 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -77,6 +77,7 @@ init_per_testcase(TestCase, Config) -> ]. end_per_testcase(_TestCase, _Config) -> + snabbkaffe:stop(), emqx_common_test_helpers:call_janitor(), emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), ok. From 8cfb22f0b89d74442b0c3a669f618e912b1b81e6 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 16 Feb 2024 12:42:48 +0100 Subject: [PATCH 2/2] fix(ds): Retry getting the shard leader --- .../src/emqx_ds_replication_layer_egress.erl | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 8b37b29cb..6c1499620 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -90,7 +90,7 @@ init([DB, Shard]) -> process_flag(trap_exit, true), process_flag(message_queue_data, off_heap), %% TODO: adjust leader dynamically - {ok, Leader} = emqx_ds_replication_layer_meta:shard_leader(DB, Shard), + Leader = shard_leader(DB, Shard), S = #s{ db = DB, shard = Shard, @@ -173,3 +173,13 @@ do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Repl start_timer() -> Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), erlang:send_after(Interval, self(), ?flush). + +shard_leader(DB, Shard) -> + %% TODO: use optvar + case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of + {ok, Leader} -> + Leader; + {error, no_leader_for_shard} -> + timer:sleep(500), + shard_leader(DB, Shard) + end.