diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index eaae64dd8..28e2c785d 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -437,7 +437,7 @@ collect_and_enqueue_query_requests(Request0, Data0) -> end, Requests ), - NewQ = append_queue(Id, Index, Q, Queries), + {_Overflow, NewQ} = append_queue(Id, Index, Q, Queries), Data = Data0#{queue := NewQ}, {Queries, Data}. @@ -1089,18 +1089,22 @@ append_queue(Id, Index, Q, Queries) -> %% because the marshaller will get lost. false = is_binary(hd(Queries)), Q0 = replayq:append(Q, Queries), - Q2 = + {Overflow, Q2} = case replayq:overflow(Q0) of - Overflow when Overflow =< 0 -> - Q0; - Overflow -> - PopOpts = #{bytes_limit => Overflow, count_limit => 999999999}, + OverflowBytes when OverflowBytes =< 0 -> + {[], Q0}; + OverflowBytes -> + PopOpts = #{bytes_limit => OverflowBytes, count_limit => 999999999}, {Q1, QAckRef, Items2} = replayq:pop(Q0, PopOpts), ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), emqx_resource_metrics:dropped_queue_full_inc(Id), - ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), - Q1 + ?SLOG(info, #{ + msg => buffer_worker_overflow, + worker_id => Id, + dropped => Dropped + }), + {Items2, Q1} end, emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)), ?tp( @@ -1108,10 +1112,11 @@ append_queue(Id, Index, Q, Queries) -> #{ id => Id, items => Queries, - queue_count => queue_count(Q2) + queue_count => queue_count(Q2), + overflow => length(Overflow) } ), - Q2. + {Overflow, Q2}. %%============================================================================== %% the inflight queue for async query