diff --git a/apps/emqx_authz/src/emqx_authz_http.erl b/apps/emqx_authz/src/emqx_authz_http.erl
index 852a667c8..a300291d1 100644
--- a/apps/emqx_authz/src/emqx_authz_http.erl
+++ b/apps/emqx_authz/src/emqx_authz_http.erl
@@ -82,7 +82,7 @@ authorize(
} = Config
) ->
Request = generate_request(PubSub, Topic, Client, Config),
- case emqx_resource:simple_sync_query(ResourceID, {Method, Request, RequestTimeout}) of
+ try emqx_resource:simple_sync_query(ResourceID, {Method, Request, RequestTimeout}) of
{ok, 204, _Headers} ->
{matched, allow};
{ok, 200, Headers, Body} ->
@@ -112,6 +112,16 @@ authorize(
reason => Reason
}),
ignore
+ catch
+ error:timeout ->
+ Reason = timeout,
+ ?tp(authz_http_request_failure, #{error => Reason}),
+ ?SLOG(error, #{
+ msg => "http_server_query_failed",
+ resource => ResourceID,
+ reason => Reason
+ }),
+ ignore
end.
log_nomtach_msg(Status, Headers, Body) ->
diff --git a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl
index e91da9829..e5a72f680 100644
--- a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl
+++ b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl
@@ -172,7 +172,7 @@ t_response_handling(_Config) ->
[
#{
?snk_kind := authz_http_request_failure,
- error := {recoverable_error, econnrefused}
+ error := timeout
}
],
?of_kind(authz_http_request_failure, Trace)
diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl
index c86087014..38fe0a144 100644
--- a/apps/emqx_bridge/src/emqx_bridge.erl
+++ b/apps/emqx_bridge/src/emqx_bridge.erl
@@ -170,8 +170,11 @@ send_message(BridgeId, Message) ->
case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of
not_found ->
{error, {bridge_not_found, BridgeId}};
- #{enable := true} ->
- emqx_resource:query(ResId, {send_message, Message});
+ #{enable := true} = Config ->
+ Timeout = emqx_map_lib:deep_get(
+ [resource_opts, request_timeout], Config, timer:seconds(15)
+ ),
+ emqx_resource:query(ResId, {send_message, Message}, #{timeout => Timeout});
#{enable := false} ->
{error, {bridge_stopped, BridgeId}}
end.
diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl
index 1f03863ae..d20d3bc10 100644
--- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl
@@ -145,10 +145,12 @@ set_special_configs(_) ->
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
+ ok = snabbkaffe:start_trace(),
Config.
end_per_testcase(_, _Config) ->
clear_resources(),
emqx_common_test_helpers:call_janitor(),
+ snabbkaffe:stop(),
ok.
clear_resources() ->
@@ -478,8 +480,6 @@ t_egress_custom_clientid_prefix(_Config) ->
end,
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
- {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
-
ok.
t_mqtt_conn_bridge_ingress_and_egress(_) ->
@@ -830,6 +830,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
<<"resource_opts">> => #{
<<"worker_pool_size">> => 2,
<<"query_mode">> => <<"sync">>,
+ <<"request_timeout">> => <<"500ms">>,
%% to make it check the healthy quickly
<<"health_check_interval">> => <<"0.5s">>
}
@@ -880,17 +881,14 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
ok = emqx_listeners:stop_listener('tcp:default'),
ct:sleep(1500),
- %% PUBLISH 2 messages to the 'local' broker, the message should
- ok = snabbkaffe:start_trace(),
+ %% PUBLISH 2 messages to the 'local' broker, the messages should
+ %% be enqueued and the resource will block
{ok, SRef} =
snabbkaffe:subscribe(
fun
- (
- #{
- ?snk_kind := call_query_enter,
- query := {query, _From, {send_message, #{}}, _Sent}
- }
- ) ->
+ (#{?snk_kind := resource_worker_retry_inflight_failed}) ->
+ true;
+ (#{?snk_kind := resource_worker_flush_nack}) ->
true;
(_) ->
false
@@ -903,7 +901,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
emqx:publish(emqx_message:make(LocalTopic, Payload1)),
emqx:publish(emqx_message:make(LocalTopic, Payload2)),
{ok, _} = snabbkaffe:receive_events(SRef),
- ok = snabbkaffe:stop(),
%% verify the metrics of the bridge, the message should be queued
{ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []),
diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf
index 0b6cbd0a2..de76967ab 100644
--- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf
+++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf
@@ -89,6 +89,17 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise
}
}
+ request_timeout {
+ desc {
+ en: """Timeout for requests. If query_mode is sync, calls to the resource will be blocked for this amount of time before timing out."""
+ zh: """请求的超时。 如果query_mode是sync,对资源的调用将在超时前被阻断这一时间。"""
+ }
+ label {
+ en: """Request timeout"""
+ zh: """请求超时"""
+ }
+ }
+
enable_batch {
desc {
en: """Batch mode enabled."""
diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl
index b1a34355b..2ef1cbed4 100644
--- a/apps/emqx_resource/src/emqx_resource_worker.erl
+++ b/apps/emqx_resource/src/emqx_resource_worker.erl
@@ -100,7 +100,7 @@ start_link(Id, Index, Opts) ->
-spec sync_query(id(), request(), query_opts()) -> Result :: term().
sync_query(Id, Request, Opts) ->
PickKey = maps:get(pick_key, Opts, self()),
- Timeout = maps:get(timeout, Opts, infinity),
+ Timeout = maps:get(timeout, Opts, timer:seconds(15)),
emqx_resource_metrics:matched_inc(Id),
pick_call(Id, PickKey, {query, Request, Opts}, Timeout).
@@ -234,10 +234,7 @@ blocked(cast, flush, Data) ->
blocked(state_timeout, unblock, St) ->
resume_from_blocked(St);
blocked(info, ?SEND_REQ(_ReqFrom, {query, _Request, _Opts}) = Request0, Data0) ->
- #{id := Id} = Data0,
- {Queries, Data} = collect_and_enqueue_query_requests(Request0, Data0),
- Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
- _ = batch_reply_caller(Id, Error, Queries),
+ {_Queries, Data} = collect_and_enqueue_query_requests(Request0, Data0),
{keep_state, Data};
blocked(info, {flush, _Ref}, _Data) ->
keep_state_and_data;
@@ -337,10 +334,16 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
} = Data0,
?tp(resource_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}),
QueryOpts = #{},
- %% if we are retrying an inflight query, it has been sent
- HasBeenSent = true,
Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
- case handle_query_result_pure(Id, Result, HasBeenSent) of
+ ReplyResult =
+ case QueryOrBatch of
+ ?QUERY(From, CoreReq, HasBeenSent) ->
+ Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
+ reply_caller_defer_metrics(Id, Reply);
+ [?QUERY(_, _, _) | _] = Batch ->
+ batch_reply_caller_defer_metrics(Id, Result, Batch)
+ end,
+ case ReplyResult of
%% Send failed because resource is down
{nack, PostFn} ->
PostFn(),
@@ -476,27 +479,20 @@ do_flush(
Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
case reply_caller(Id, Reply) of
%% Failed; remove the request from the queue, as we cannot pop
- %% from it again. But we must ensure it's in the inflight
- %% table, even if it's full, so we don't lose the request.
- %% And only in that case.
+ %% from it again, but we'll retry it using the inflight table.
nack ->
ok = replayq:ack(Q1, QAckRef),
- %% We might get a retriable response without having added
- %% the request to the inflight table (e.g.: sync request,
- %% but resource health check failed prior to calling and
- %% so we didn't even call it). In that case, we must then
- %% add it to the inflight table.
- IsRetriable =
- is_recoverable_error_result(Result) orelse
- is_not_connected_result(Result),
- ShouldPreserveInInflight = is_not_connected_result(Result),
%% we set it atomically just below; a limitation of having
%% to use tuples for atomic ets updates
+ IsRetriable = true,
WorkerMRef0 = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, WorkerMRef0),
- ShouldPreserveInInflight andalso
- inflight_append(InflightTID, InflightItem, Id, Index),
- IsRetriable andalso mark_inflight_as_retriable(InflightTID, Ref),
+ %% we must append again to the table to ensure that the
+ %% request will be retried (i.e., it might not have been
+ %% inserted during `call_query' if the resource was down
+ %% and/or if it was a sync request).
+ inflight_append(InflightTID, InflightItem, Id, Index),
+ mark_inflight_as_retriable(InflightTID, Ref),
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
@@ -513,11 +509,21 @@ do_flush(
%% Success; just ack.
ack ->
ok = replayq:ack(Q1, QAckRef),
+ %% Async requests are acked later when the async worker
+ %% calls the corresponding callback function. Also, we
+ %% must ensure the async worker is being monitored for
+ %% such requests.
is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index),
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
- ?tp(resource_worker_flush_ack, #{batch_or_query => Request}),
+ ?tp(
+ resource_worker_flush_ack,
+ #{
+ batch_or_query => Request,
+ result => Result
+ }
+ ),
case queue_count(Q1) > 0 of
true ->
{keep_state, Data1, [{next_event, internal, flush}]};
@@ -542,27 +548,20 @@ do_flush(Data0, #{
Result = call_query(configured, Id, Index, Ref, Batch, QueryOpts),
case batch_reply_caller(Id, Result, Batch) of
%% Failed; remove the request from the queue, as we cannot pop
- %% from it again. But we must ensure it's in the inflight
- %% table, even if it's full, so we don't lose the request.
- %% And only in that case.
+ %% from it again, but we'll retry it using the inflight table.
nack ->
ok = replayq:ack(Q1, QAckRef),
- %% We might get a retriable response without having added
- %% the request to the inflight table (e.g.: sync request,
- %% but resource health check failed prior to calling and
- %% so we didn't even call it). In that case, we must then
- %% add it to the inflight table.
- IsRetriable =
- is_recoverable_error_result(Result) orelse
- is_not_connected_result(Result),
- ShouldPreserveInInflight = is_not_connected_result(Result),
%% we set it atomically just below; a limitation of having
%% to use tuples for atomic ets updates
+ IsRetriable = true,
WorkerMRef0 = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef0),
- ShouldPreserveInInflight andalso
- inflight_append(InflightTID, InflightItem, Id, Index),
- IsRetriable andalso mark_inflight_as_retriable(InflightTID, Ref),
+ %% we must append again to the table to ensure that the
+ %% request will be retried (i.e., it might not have been
+ %% inserted during `call_query' if the resource was down
+ %% and/or if it was a sync request).
+ inflight_append(InflightTID, InflightItem, Id, Index),
+ mark_inflight_as_retriable(InflightTID, Ref),
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
@@ -579,11 +578,21 @@ do_flush(Data0, #{
%% Success; just ack.
ack ->
ok = replayq:ack(Q1, QAckRef),
+ %% Async requests are acked later when the async worker
+ %% calls the corresponding callback function. Also, we
+ %% must ensure the async worker is being monitored for
+ %% such requests.
is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index),
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
- ?tp(resource_worker_flush_ack, #{batch_or_query => Batch}),
+ ?tp(
+ resource_worker_flush_ack,
+ #{
+ batch_or_query => Batch,
+ result => Result
+ }
+ ),
CurrentCount = queue_count(Q1),
case {CurrentCount > 0, CurrentCount >= BatchSize} of
{false, _} ->
@@ -597,54 +606,79 @@ do_flush(Data0, #{
end.
batch_reply_caller(Id, BatchResult, Batch) ->
- {ShouldBlock, PostFns} = batch_reply_caller_defer_metrics(Id, BatchResult, Batch),
- lists:foreach(fun(F) -> F() end, PostFns),
- ShouldBlock.
-
-batch_reply_caller_defer_metrics(Id, BatchResult, Batch) ->
- lists:foldl(
- fun(Reply, {_ShouldBlock, PostFns}) ->
- {ShouldBlock, PostFn} = reply_caller_defer_metrics(Id, Reply),
- {ShouldBlock, [PostFn | PostFns]}
- end,
- {ack, []},
- %% the `Mod:on_batch_query/3` returns a single result for a batch,
- %% so we need to expand
- ?EXPAND(BatchResult, Batch)
- ).
-
-reply_caller(Id, Reply) ->
- {ShouldBlock, PostFn} = reply_caller_defer_metrics(Id, Reply),
+ {ShouldBlock, PostFn} = batch_reply_caller_defer_metrics(Id, BatchResult, Batch),
PostFn(),
ShouldBlock.
+batch_reply_caller_defer_metrics(Id, BatchResult, Batch) ->
+ {ShouldAck, PostFns} =
+ lists:foldl(
+ fun(Reply, {_ShouldAck, PostFns}) ->
+ {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply),
+ {ShouldAck, [PostFn | PostFns]}
+ end,
+ {ack, []},
+ %% the `Mod:on_batch_query/3` returns a single result for a batch,
+ %% so we need to expand
+ ?EXPAND(BatchResult, Batch)
+ ),
+ PostFn = fun() -> lists:foreach(fun(F) -> F() end, PostFns) end,
+ {ShouldAck, PostFn}.
+
+reply_caller(Id, Reply) ->
+ {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply),
+ PostFn(),
+ ShouldAck.
+
+%% Should only reply to the caller when the decision is final (not
+%% retriable). See comment on `handle_query_result_pure'.
reply_caller_defer_metrics(Id, ?REPLY(undefined, _, HasBeenSent, Result)) ->
handle_query_result_pure(Id, Result, HasBeenSent);
reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result)) when
is_function(ReplyFun)
->
- _ =
- case Result of
- {async_return, _} -> no_reply_for_now;
- _ -> apply(ReplyFun, Args ++ [Result])
- end,
- handle_query_result_pure(Id, Result, HasBeenSent);
+ {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
+ case {ShouldAck, Result} of
+ {nack, _} ->
+ ok;
+ {ack, {async_return, _}} ->
+ ok;
+ {ack, _} ->
+ apply(ReplyFun, Args ++ [Result]),
+ ok
+ end,
+ {ShouldAck, PostFn};
reply_caller_defer_metrics(Id, ?REPLY(From, _, HasBeenSent, Result)) ->
- gen_statem:reply(From, Result),
- handle_query_result_pure(Id, Result, HasBeenSent).
+ {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
+ case {ShouldAck, Result} of
+ {nack, _} ->
+ ok;
+ {ack, {async_return, _}} ->
+ ok;
+ {ack, _} ->
+ gen_statem:reply(From, Result),
+ ok
+ end,
+ {ShouldAck, PostFn}.
handle_query_result(Id, Result, HasBeenSent) ->
{ShouldBlock, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
PostFn(),
ShouldBlock.
-handle_query_result_pure(Id, ?RESOURCE_ERROR_M(exception, Msg), HasBeenSent) ->
+%% We should always retry (nack), except when:
+%% * resource is not found
+%% * resource is stopped
+%% * the result is a success (or at least a delayed result)
+%% We also retry even sync requests. In that case, we shouldn't reply
+%% the caller until one of those final results above happen.
+handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(exception, Msg), _HasBeenSent) ->
PostFn = fun() ->
?SLOG(error, #{msg => resource_exception, info => Msg}),
- inc_sent_failed(Id, HasBeenSent),
+ %% inc_sent_failed(Id, HasBeenSent),
ok
end,
- {ack, PostFn};
+ {nack, PostFn};
handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent) when
NotWorking == not_connected; NotWorking == blocked
->
@@ -666,10 +700,12 @@ handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent) ->
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}),
- emqx_resource_metrics:dropped_other_inc(Id),
+ %% emqx_resource_metrics:dropped_other_inc(Id),
ok
end,
- {ack, PostFn};
+ {nack, PostFn};
+%% TODO: invert this logic: we should differentiate errors that are
+%% irrecoverable; all others are deemed recoverable.
handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent) ->
%% the message will be queued in replayq or inflight window,
%% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not
@@ -679,22 +715,18 @@ handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent)
ok
end,
{nack, PostFn};
-handle_query_result_pure(Id, {error, Reason}, HasBeenSent) ->
+handle_query_result_pure(Id, {error, Reason}, _HasBeenSent) ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => send_error, reason => Reason}),
- inc_sent_failed(Id, HasBeenSent),
ok
end,
- {ack, PostFn};
-handle_query_result_pure(_Id, {async_return, inflight_full}, _HasBeenSent) ->
- {nack, fun() -> ok end};
-handle_query_result_pure(Id, {async_return, {error, Msg}}, HasBeenSent) ->
+ {nack, PostFn};
+handle_query_result_pure(Id, {async_return, {error, Msg}}, _HasBeenSent) ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}),
- inc_sent_failed(Id, HasBeenSent),
ok
end,
- {ack, PostFn};
+ {nack, PostFn};
handle_query_result_pure(_Id, {async_return, ok}, _HasBeenSent) ->
{ack, fun() -> ok end};
handle_query_result_pure(_Id, {async_return, {ok, Pid}}, _HasBeenSent) when is_pid(Pid) ->
@@ -714,18 +746,6 @@ handle_async_worker_down(Data0, Pid) ->
cancel_inflight_items(Data, WorkerMRef),
{keep_state, Data}.
-is_not_connected_result(?RESOURCE_ERROR_M(Error, _)) when
- Error =:= not_connected; Error =:= blocked
-->
- true;
-is_not_connected_result(_) ->
- false.
-
-is_recoverable_error_result({error, {recoverable_error, _Reason}}) ->
- true;
-is_recoverable_error_result(_) ->
- false.
-
call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
?tp(call_query_enter, #{id => Id, query => Query}),
case emqx_resource_manager:ets_lookup(Id) of
@@ -735,8 +755,9 @@ call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
configured -> maps:get(query_mode, Data);
_ -> QM0
end,
- CM = maps:get(callback_mode, Data),
- apply_query_fun(call_mode(QM, CM), Mod, Id, Index, Ref, Query, ResSt, QueryOpts);
+ CBM = maps:get(callback_mode, Data),
+ CallMode = call_mode(QM, CBM),
+ apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts);
{ok, _Group, #{status := stopped}} ->
?RESOURCE_ERROR(stopped, "resource stopped or disabled");
{ok, _Group, #{status := S}} when S == connecting; S == disconnected ->
@@ -763,20 +784,9 @@ call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
end
).
-apply_query_fun(sync, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
- ?tp(call_query, #{id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => sync}),
- InflightTID = maps:get(inflight_tid, QueryOpts, undefined),
- ?APPLY_RESOURCE(
- call_query,
- begin
- IsRetriable = false,
- WorkerMRef = undefined,
- InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
- ok = inflight_append(InflightTID, InflightItem, Id, Index),
- Mod:on_query(Id, Request, ResSt)
- end,
- Request
- );
+apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) ->
+ ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}),
+ ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request);
apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
?tp(call_query_async, #{
id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
@@ -796,23 +806,12 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt
end,
Request
);
-apply_query_fun(sync, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
+apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) ->
?tp(call_batch_query, #{
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
}),
- InflightTID = maps:get(inflight_tid, QueryOpts, undefined),
Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
- ?APPLY_RESOURCE(
- call_batch_query,
- begin
- IsRetriable = false,
- WorkerMRef = undefined,
- InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
- ok = inflight_append(InflightTID, InflightItem, Id, Index),
- Mod:on_batch_query(Id, Requests, ResSt)
- end,
- Batch
- );
+ ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
?tp(call_batch_query_async, #{
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async
@@ -839,27 +838,27 @@ reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBee
%% but received no ACK, NOT the number of messages queued in the
%% inflight window.
{Action, PostFn} = reply_caller_defer_metrics(Id, ?REPLY(From, Request, HasBeenSent, Result)),
- %% Should always ack async inflight requests that
- %% returned, otherwise the request will get retried. The
- %% caller has just been notified of the failure and should
- %% decide if it wants to retry or not.
- IsFullBefore = is_inflight_full(InflightTID),
- IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
- IsAcked andalso PostFn(),
case Action of
nack ->
+ %% Keep retrying.
?tp(resource_worker_reply_after_query, #{
- action => nack,
+ action => Action,
batch_or_query => ?QUERY(From, Request, HasBeenSent),
+ ref => Ref,
result => Result
}),
+ mark_inflight_as_retriable(InflightTID, Ref),
?MODULE:block(Pid);
ack ->
?tp(resource_worker_reply_after_query, #{
- action => ack,
+ action => Action,
batch_or_query => ?QUERY(From, Request, HasBeenSent),
+ ref => Ref,
result => Result
}),
+ IsFullBefore = is_inflight_full(InflightTID),
+ IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
+ IsAcked andalso PostFn(),
IsFullBefore andalso ?MODULE:flush_worker(Pid),
ok
end.
@@ -868,24 +867,28 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) ->
%% NOTE: 'inflight' is the count of messages that were sent async
%% but received no ACK, NOT the number of messages queued in the
%% inflight window.
- {Action, PostFns} = batch_reply_caller_defer_metrics(Id, Result, Batch),
- %% Should always ack async inflight requests that
- %% returned, otherwise the request will get retried. The
- %% caller has just been notified of the failure and should
- %% decide if it wants to retry or not.
- IsFullBefore = is_inflight_full(InflightTID),
- IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
- IsAcked andalso lists:foreach(fun(F) -> F() end, PostFns),
+ {Action, PostFn} = batch_reply_caller_defer_metrics(Id, Result, Batch),
case Action of
nack ->
+ %% Keep retrying.
?tp(resource_worker_reply_after_query, #{
- action => nack, batch_or_query => Batch, result => Result
+ action => nack,
+ batch_or_query => Batch,
+ ref => Ref,
+ result => Result
}),
+ mark_inflight_as_retriable(InflightTID, Ref),
?MODULE:block(Pid);
ack ->
?tp(resource_worker_reply_after_query, #{
- action => ack, batch_or_query => Batch, result => Result
+ action => ack,
+ batch_or_query => Batch,
+ ref => Ref,
+ result => Result
}),
+ IsFullBefore = is_inflight_full(InflightTID),
+ IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
+ IsAcked andalso PostFn(),
IsFullBefore andalso ?MODULE:flush_worker(Pid),
ok
end.
@@ -919,7 +922,14 @@ append_queue(Id, Index, Q, Queries) when not is_binary(Q) ->
Q1
end,
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)),
- ?tp(resource_worker_appended_to_queue, #{id => Id, items => Queries}),
+ ?tp(
+ resource_worker_appended_to_queue,
+ #{
+ id => Id,
+ items => Queries,
+ queue_count => queue_count(Q2)
+ }
+ ),
Q2.
%%==============================================================================
@@ -1110,11 +1120,6 @@ do_cancel_inflight_item(Data, Ref) ->
%%==============================================================================
-inc_sent_failed(Id, _HasBeenSent = true) ->
- emqx_resource_metrics:retried_failed_inc(Id);
-inc_sent_failed(Id, _HasBeenSent) ->
- emqx_resource_metrics:failed_inc(Id).
-
inc_sent_success(Id, _HasBeenSent = true) ->
emqx_resource_metrics:retried_success_inc(Id);
inc_sent_success(Id, _HasBeenSent) ->
diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl
index d105b21ef..ea5ee97ca 100644
--- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl
+++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl
@@ -48,6 +48,7 @@ fields("creation_opts") ->
{health_check_interval, fun health_check_interval/1},
{auto_restart_interval, fun auto_restart_interval/1},
{query_mode, fun query_mode/1},
+ {request_timeout, fun request_timeout/1},
{async_inflight_window, fun async_inflight_window/1},
{enable_batch, fun enable_batch/1},
{batch_size, fun batch_size/1},
@@ -80,6 +81,11 @@ query_mode(default) -> async;
query_mode(required) -> false;
query_mode(_) -> undefined.
+request_timeout(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]);
+request_timeout(desc) -> ?DESC("request_timeout");
+request_timeout(default) -> <<"15s">>;
+request_timeout(_) -> undefined.
+
enable_batch(type) -> boolean();
enable_batch(required) -> false;
enable_batch(default) -> true;
diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl
index 15d4a3b46..c2b0c5733 100644
--- a/apps/emqx_resource/test/emqx_connector_demo.erl
+++ b/apps/emqx_resource/test/emqx_connector_demo.erl
@@ -259,6 +259,9 @@ counter_loop(
apply_reply(ReplyFun, ok),
?tp(connector_demo_inc_counter_async, #{n => N}),
State#{counter => Num + N};
+ {big_payload, _Payload, ReplyFun} when Status == blocked ->
+ apply_reply(ReplyFun, {error, blocked}),
+ State;
{{FromPid, ReqRef}, {inc, N}} when Status == running ->
%ct:pal("sync counter recv: ~p", [{inc, N}]),
FromPid ! {ReqRef, ok},
@@ -269,6 +272,9 @@ counter_loop(
{{FromPid, ReqRef}, {big_payload, _Payload}} when Status == blocked ->
FromPid ! {ReqRef, incorrect_status},
State#{incorrect_status_count := IncorrectCount + 1};
+ {{FromPid, ReqRef}, {big_payload, _Payload}} when Status == running ->
+ FromPid ! {ReqRef, ok},
+ State;
{get, ReplyFun} ->
apply_reply(ReplyFun, Num),
State;
diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl
index da140489e..f71dc4bb9 100644
--- a/apps/emqx_resource/test/emqx_resource_SUITE.erl
+++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl
@@ -411,35 +411,18 @@ t_query_counter_async_inflight(_) ->
%% send async query to make the inflight window full
?check_trace(
- begin
- {ok, SRef} = snabbkaffe:subscribe(
- ?match_event(
- #{
- ?snk_kind := resource_worker_appended_to_inflight,
- is_new := true
- }
- ),
- WindowSize,
- _Timeout = 5_000
+ {_, {ok, _}} =
+ ?wait_async_action(
+ inc_counter_in_parallel(WindowSize, ReqOpts),
+ #{?snk_kind := resource_worker_flush_but_inflight_full},
+ 1_000
),
- inc_counter_in_parallel(WindowSize, ReqOpts),
- {ok, _} = snabbkaffe:receive_events(SRef),
- ok
- end,
fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
end
),
tap_metrics(?LINE),
-
- %% this will block the resource_worker as the inflight window is full now
- {ok, {ok, _}} =
- ?wait_async_action(
- emqx_resource:query(?ID, {inc_counter, 199}),
- #{?snk_kind := resource_worker_flush_but_inflight_full},
- 1_000
- ),
?assertMatch(0, ets:info(Tab0, size)),
tap_metrics(?LINE),
@@ -464,9 +447,9 @@ t_query_counter_async_inflight(_) ->
%% all responses should be received after the resource is resumed.
{ok, SRef0} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
- %% +2 because the tmp_query above will be retried and succeed
- %% this time, and there was the inc 199 request as well.
- WindowSize + 2,
+ %% +1 because the tmp_query above will be retried and succeed
+ %% this time.
+ WindowSize + 1,
_Timeout0 = 10_000
),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
@@ -504,8 +487,12 @@ t_query_counter_async_inflight(_) ->
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
%% again, send async query to make the inflight window full
?check_trace(
- ?TRACE_OPTS,
- inc_counter_in_parallel(WindowSize, ReqOpts),
+ {_, {ok, _}} =
+ ?wait_async_action(
+ inc_counter_in_parallel(WindowSize, ReqOpts),
+ #{?snk_kind := resource_worker_flush_but_inflight_full},
+ 1_000
+ ),
fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
@@ -584,7 +571,7 @@ t_query_counter_async_inflight_batch(_) ->
end,
ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end,
BatchSize = 2,
- WindowSize = 3,
+ WindowSize = 15,
{ok, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
@@ -606,16 +593,12 @@ t_query_counter_async_inflight_batch(_) ->
%% send async query to make the inflight window full
NumMsgs = BatchSize * WindowSize,
?check_trace(
- begin
- {ok, SRef} = snabbkaffe:subscribe(
- ?match_event(#{?snk_kind := call_batch_query_async}),
- WindowSize,
- _Timeout = 60_000
+ {_, {ok, _}} =
+ ?wait_async_action(
+ inc_counter_in_parallel(NumMsgs, ReqOpts),
+ #{?snk_kind := resource_worker_flush_but_inflight_full},
+ 5_000
),
- inc_counter_in_parallel(NumMsgs, ReqOpts),
- {ok, _} = snabbkaffe:receive_events(SRef),
- ok
- end,
fun(Trace) ->
QueryTrace = ?of_kind(call_batch_query_async, Trace),
?assertMatch(
@@ -674,7 +657,7 @@ t_query_counter_async_inflight_batch(_) ->
%% +1 because the tmp_query above will be retried and succeed
%% this time.
WindowSize + 1,
- _Timeout = 60_000
+ 10_000
),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
tap_metrics(?LINE),
@@ -695,7 +678,7 @@ t_query_counter_async_inflight_batch(_) ->
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
NumBatches1,
- _Timeout = 60_000
+ 10_000
),
inc_counter_in_parallel(NumMsgs1, ReqOpts),
{ok, _} = snabbkaffe:receive_events(SRef),
@@ -720,8 +703,12 @@ t_query_counter_async_inflight_batch(_) ->
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
%% again, send async query to make the inflight window full
?check_trace(
- ?TRACE_OPTS,
- inc_counter_in_parallel(WindowSize, ReqOpts),
+ {_, {ok, _}} =
+ ?wait_async_action(
+ inc_counter_in_parallel(NumMsgs, ReqOpts),
+ #{?snk_kind := resource_worker_flush_but_inflight_full},
+ 5_000
+ ),
fun(Trace) ->
QueryTrace = ?of_kind(call_batch_query_async, Trace),
?assertMatch(
@@ -734,11 +721,11 @@ t_query_counter_async_inflight_batch(_) ->
%% this will block the resource_worker
ok = emqx_resource:query(?ID, {inc_counter, 1}),
- Sent = NumMsgs + NumMsgs1 + WindowSize,
+ Sent = NumMsgs + NumMsgs1 + NumMsgs,
{ok, SRef1} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
WindowSize,
- _Timeout = 60_000
+ 10_000
),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
{ok, _} = snabbkaffe:receive_events(SRef1),
@@ -785,10 +772,8 @@ t_healthy_timeout(_) ->
%% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later.
#{health_check_interval => 200}
),
- ?assertMatch(
- ?RESOURCE_ERROR(not_connected),
- emqx_resource:query(?ID, get_state)
- ),
+ ?assertError(timeout, emqx_resource:query(?ID, get_state, #{timeout => 1_000})),
+ ?assertMatch({ok, _Group, #{status := disconnected}}, emqx_resource_manager:ets_lookup(?ID)),
ok = emqx_resource:remove_local(?ID).
t_healthy(_) ->
@@ -1131,6 +1116,7 @@ t_retry_batch(_Config) ->
ok.
t_delete_and_re_create_with_same_name(_Config) ->
+ NumBufferWorkers = 2,
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
@@ -1139,7 +1125,7 @@ t_delete_and_re_create_with_same_name(_Config) ->
#{
query_mode => sync,
batch_size => 1,
- worker_pool_size => 2,
+ worker_pool_size => NumBufferWorkers,
queue_seg_bytes => 100,
resume_interval => 1_000
}
@@ -1154,19 +1140,21 @@ t_delete_and_re_create_with_same_name(_Config) ->
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
NumRequests = 10,
{ok, SRef} = snabbkaffe:subscribe(
- ?match_event(#{?snk_kind := resource_worker_appended_to_queue}),
- NumRequests,
+ ?match_event(#{?snk_kind := resource_worker_enter_blocked}),
+ NumBufferWorkers,
_Timeout = 5_000
),
%% ensure replayq offloads to disk
Payload = binary:copy(<<"a">>, 119),
lists:foreach(
fun(N) ->
- {error, _} =
- emqx_resource:query(
- ?ID,
- {big_payload, <<(integer_to_binary(N))/binary, Payload/binary>>}
- )
+ spawn_link(fun() ->
+ {error, _} =
+ emqx_resource:query(
+ ?ID,
+ {big_payload, <<(integer_to_binary(N))/binary, Payload/binary>>}
+ )
+ end)
end,
lists:seq(1, NumRequests)
),
@@ -1177,10 +1165,11 @@ t_delete_and_re_create_with_same_name(_Config) ->
tap_metrics(?LINE),
Queuing1 = emqx_resource_metrics:queuing_get(?ID),
Inflight1 = emqx_resource_metrics:inflight_get(?ID),
- ?assertEqual(NumRequests - 1, Queuing1),
- ?assertEqual(1, Inflight1),
+ ?assert(Queuing1 > 0),
+ ?assertEqual(2, Inflight1),
%% now, we delete the resource
+ process_flag(trap_exit, true),
ok = emqx_resource:remove_local(?ID),
?assertEqual({error, not_found}, emqx_resource_manager:lookup(?ID)),
@@ -1275,9 +1264,13 @@ t_retry_sync_inflight(_Config) ->
%% now really make the resource go into `blocked' state.
%% this results in a retriable error when sync.
ok = emqx_resource:simple_sync_query(?ID, block),
- {{error, {recoverable_error, incorrect_status}}, {ok, _}} =
+ TestPid = self(),
+ {_, {ok, _}} =
?wait_async_action(
- emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts),
+ spawn_link(fun() ->
+ Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts),
+ TestPid ! {res, Res}
+ end),
#{?snk_kind := resource_worker_retry_inflight_failed},
ResumeInterval * 2
),
@@ -1287,9 +1280,15 @@ t_retry_sync_inflight(_Config) ->
#{?snk_kind := resource_worker_retry_inflight_succeeded},
ResumeInterval * 3
),
+ receive
+ {res, Res} ->
+ ?assertEqual(ok, Res)
+ after 5_000 ->
+ ct:fail("no response")
+ end,
ok
end,
- [fun ?MODULE:assert_retry_fail_then_succeed_inflight/1]
+ [fun ?MODULE:assert_sync_retry_fail_then_succeed_inflight/1]
),
ok.
@@ -1312,12 +1311,17 @@ t_retry_sync_inflight_batch(_Config) ->
QueryOpts = #{},
?check_trace(
begin
- %% now really make the resource go into `blocked' state.
- %% this results in a retriable error when sync.
+ %% make the resource go into `blocked' state. this
+ %% results in a retriable error when sync.
ok = emqx_resource:simple_sync_query(?ID, block),
- {{error, {recoverable_error, incorrect_status}}, {ok, _}} =
+ process_flag(trap_exit, true),
+ TestPid = self(),
+ {_, {ok, _}} =
?wait_async_action(
- emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts),
+ spawn_link(fun() ->
+ Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts),
+ TestPid ! {res, Res}
+ end),
#{?snk_kind := resource_worker_retry_inflight_failed},
ResumeInterval * 2
),
@@ -1327,13 +1331,19 @@ t_retry_sync_inflight_batch(_Config) ->
#{?snk_kind := resource_worker_retry_inflight_succeeded},
ResumeInterval * 3
),
+ receive
+ {res, Res} ->
+ ?assertEqual(ok, Res)
+ after 5_000 ->
+ ct:fail("no response")
+ end,
ok
end,
- [fun ?MODULE:assert_retry_fail_then_succeed_inflight/1]
+ [fun ?MODULE:assert_sync_retry_fail_then_succeed_inflight/1]
),
ok.
-t_dont_retry_async_inflight(_Config) ->
+t_retry_async_inflight(_Config) ->
ResumeInterval = 1_000,
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = emqx_resource:create(
@@ -1351,33 +1361,31 @@ t_dont_retry_async_inflight(_Config) ->
QueryOpts = #{},
?check_trace(
begin
- %% block,
- {ok, {ok, _}} =
- ?wait_async_action(
- emqx_resource:query(?ID, block_now),
- #{?snk_kind := resource_worker_enter_blocked},
- ResumeInterval * 2
- ),
+ %% block
+ ok = emqx_resource:simple_sync_query(?ID, block),
- %% then send an async request; that shouldn't be retriable.
+ %% then send an async request; that should be retriable.
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts),
- #{?snk_kind := resource_worker_flush_ack},
+ #{?snk_kind := resource_worker_retry_inflight_failed},
ResumeInterval * 2
),
- %% will re-enter running because the single request is not retriable
- {ok, _} = ?block_until(
- #{?snk_kind := resource_worker_enter_running}, ResumeInterval * 2
- ),
+ %% will reply with success after the resource is healed
+ {ok, {ok, _}} =
+ ?wait_async_action(
+ emqx_resource:simple_sync_query(?ID, resume),
+ #{?snk_kind := resource_worker_enter_running},
+ ResumeInterval * 2
+ ),
ok
end,
- [fun ?MODULE:assert_no_retry_inflight/1]
+ [fun ?MODULE:assert_async_retry_fail_then_succeed_inflight/1]
),
ok.
-t_dont_retry_async_inflight_batch(_Config) ->
+t_retry_async_inflight_batch(_Config) ->
ResumeInterval = 1_000,
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = emqx_resource:create(
@@ -1396,29 +1404,27 @@ t_dont_retry_async_inflight_batch(_Config) ->
QueryOpts = #{},
?check_trace(
begin
- %% block,
- {ok, {ok, _}} =
- ?wait_async_action(
- emqx_resource:query(?ID, block_now),
- #{?snk_kind := resource_worker_enter_blocked},
- ResumeInterval * 2
- ),
+ %% block
+ ok = emqx_resource:simple_sync_query(?ID, block),
- %% then send an async request; that shouldn't be retriable.
+ %% then send an async request; that should be retriable.
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts),
- #{?snk_kind := resource_worker_flush_ack},
+ #{?snk_kind := resource_worker_retry_inflight_failed},
ResumeInterval * 2
),
- %% will re-enter running because the single request is not retriable
- {ok, _} = ?block_until(
- #{?snk_kind := resource_worker_enter_running}, ResumeInterval * 2
- ),
+ %% will reply with success after the resource is healed
+ {ok, {ok, _}} =
+ ?wait_async_action(
+ emqx_resource:simple_sync_query(?ID, resume),
+ #{?snk_kind := resource_worker_enter_running},
+ ResumeInterval * 2
+ ),
ok
end,
- [fun ?MODULE:assert_no_retry_inflight/1]
+ [fun ?MODULE:assert_async_retry_fail_then_succeed_inflight/1]
),
ok.
@@ -1529,7 +1535,8 @@ inc_counter_in_parallel(N, Opts0) ->
ct:fail({wait_for_query_timeout, Pid})
end
|| Pid <- Pids
- ].
+ ],
+ ok.
inc_counter_in_parallel_increasing(N, StartN, Opts0) ->
Parent = self(),
@@ -1566,12 +1573,8 @@ tap_metrics(Line) ->
ct:pal("metrics (l. ~b): ~p", [Line, #{counters => C, gauges => G}]),
#{counters => C, gauges => G}.
-assert_no_retry_inflight(Trace) ->
- ?assertEqual([], ?of_kind(resource_worker_retry_inflight_failed, Trace)),
- ?assertEqual([], ?of_kind(resource_worker_retry_inflight_succeeded, Trace)),
- ok.
-
-assert_retry_fail_then_succeed_inflight(Trace) ->
+assert_sync_retry_fail_then_succeed_inflight(Trace) ->
+ ct:pal(" ~p", [Trace]),
?assert(
?strict_causality(
#{?snk_kind := resource_worker_flush_nack, ref := _Ref},
@@ -1589,3 +1592,23 @@ assert_retry_fail_then_succeed_inflight(Trace) ->
)
),
ok.
+
+assert_async_retry_fail_then_succeed_inflight(Trace) ->
+ ct:pal(" ~p", [Trace]),
+ ?assert(
+ ?strict_causality(
+ #{?snk_kind := resource_worker_reply_after_query, action := nack, ref := _Ref},
+ #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref},
+ Trace
+ )
+ ),
+ %% not strict causality because it might retry more than once
+ %% before restoring the resource health.
+ ?assert(
+ ?causality(
+ #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref},
+ #{?snk_kind := resource_worker_retry_inflight_succeeded, ref := _Ref},
+ Trace
+ )
+ ),
+ ok.
diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl
index 4c20705b6..22a5dc859 100644
--- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl
+++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl
@@ -108,6 +108,7 @@ end_per_group(_Group, _Config) ->
init_per_testcase(TestCase, Config0) when
TestCase =:= t_publish_success_batch
->
+ ct:timetrap({seconds, 30}),
case ?config(batch_size, Config0) of
1 ->
[{skip_due_to_no_batching, true}];
@@ -120,6 +121,7 @@ init_per_testcase(TestCase, Config0) when
[{telemetry_table, Tid} | Config]
end;
init_per_testcase(TestCase, Config0) ->
+ ct:timetrap({seconds, 30}),
{ok, _} = start_echo_http_server(),
delete_all_bridges(),
Tid = install_telemetry_handler(TestCase),
@@ -283,6 +285,7 @@ gcp_pubsub_config(Config) ->
" pool_size = 1\n"
" pipelining = ~b\n"
" resource_opts = {\n"
+ " request_timeout = 500ms\n"
" worker_pool_size = 1\n"
" query_mode = ~s\n"
" batch_size = ~b\n"
@@ -1266,7 +1269,6 @@ t_failure_no_body(Config) ->
t_unrecoverable_error(Config) ->
ResourceId = ?config(resource_id, Config),
- TelemetryTable = ?config(telemetry_table, Config),
QueryMode = ?config(query_mode, Config),
TestPid = self(),
FailureNoBodyHandler =
@@ -1328,26 +1330,14 @@ t_unrecoverable_error(Config) ->
ok
end
),
- wait_telemetry_event(TelemetryTable, failed, ResourceId),
- ExpectedInflightEvents =
- case QueryMode of
- sync -> 1;
- async -> 3
- end,
- wait_telemetry_event(
- TelemetryTable,
- inflight,
- ResourceId,
- #{n_events => ExpectedInflightEvents, timeout => 5_000}
- ),
- %% even waiting, hard to avoid flakiness... simpler to just sleep
- %% a bit until stabilization.
- ct:sleep(200),
+
+ wait_until_gauge_is(queuing, 0, _Timeout = 400),
+ wait_until_gauge_is(inflight, 1, _Timeout = 400),
assert_metrics(
#{
dropped => 0,
- failed => 1,
- inflight => 0,
+ failed => 0,
+ inflight => 1,
matched => 1,
queuing => 0,
retried => 0,