diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index 227af26b3..f39c88441 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -40,8 +40,8 @@ end ). -%% NOTE: do not forget to add every used msg to the default value of -%% `log.thorttling.msgs` list. +%% NOTE: do not forget to use atom for msg and add every used msg to +%% the default value of `log.thorttling.msgs` list. -define(SLOG_THROTTLE(Level, Data), ?SLOG_THROTTLE(Level, Data, #{}) ). diff --git a/apps/emqx/src/emqx_access_control.erl b/apps/emqx/src/emqx_access_control.erl index 13b02bb4d..d97dbd167 100644 --- a/apps/emqx/src/emqx_access_control.erl +++ b/apps/emqx/src/emqx_access_control.erl @@ -183,8 +183,13 @@ log_result(#{username := Username}, Topic, Action, From, Result) -> } end, case Result of - allow -> ?SLOG(info, (LogMeta())#{msg => "authorization_permission_allowed"}); - deny -> ?SLOG(info, (LogMeta())#{msg => "authorization_permission_denied"}) + allow -> + ?SLOG(info, (LogMeta())#{msg => "authorization_permission_allowed"}); + deny -> + ?SLOG_THROTTLE( + warning, + (LogMeta())#{msg => authorization_permission_denied} + ) end. %% @private Format authorization rules source. diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 658bc7bbb..192335a25 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -616,10 +616,10 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) -> Msg = packet_to_message(NPacket, NChannel), do_publish(PacketId, Msg, NChannel); {error, Rc = ?RC_NOT_AUTHORIZED, NChannel} -> - ?SLOG( - info, + ?SLOG_THROTTLE( + warning, #{ - msg => "cannot_publish_to_topic", + msg => cannot_publish_to_topic_due_to_not_authorized, reason => emqx_reason_codes:name(Rc) }, #{topic => Topic} @@ -635,10 +635,10 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) -> handle_out(disconnect, Rc, NChannel) end; {error, Rc = ?RC_QUOTA_EXCEEDED, NChannel} -> - ?SLOG( - info, + ?SLOG_THROTTLE( + warning, #{ - msg => "cannot_publish_to_topic", + msg => cannot_publish_to_topic_due_to_quota_exceeded, reason => emqx_reason_codes:name(Rc) }, #{topic => Topic} diff --git a/apps/emqx/src/emqx_log_throttler.erl b/apps/emqx/src/emqx_log_throttler.erl index 93666da1b..ef29d5a79 100644 --- a/apps/emqx/src/emqx_log_throttler.erl +++ b/apps/emqx/src/emqx_log_throttler.erl @@ -50,10 +50,10 @@ -define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])). -define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))). --spec allow(logger:level(), string()) -> boolean(). +-spec allow(logger:level(), atom()) -> boolean(). allow(debug, _Msg) -> true; -allow(_Level, Msg) -> +allow(_Level, Msg) when is_atom(Msg) -> Seq = persistent_term:get(?SEQ_ID(Msg), undefined), case Seq of undefined -> @@ -79,8 +79,9 @@ start_link() -> init([]) -> ok = lists:foreach(fun(Msg) -> ?NEW_THROTTLE(Msg, ?NEW_SEQ) end, ?MSGS_LIST), - TimerRef = schedule_refresh(?TIME_WINDOW_MS), - {ok, #{timer_ref => TimerRef}}. + CurrentPeriodMs = ?TIME_WINDOW_MS, + TimerRef = schedule_refresh(CurrentPeriodMs), + {ok, #{timer_ref => TimerRef, current_period_ms => CurrentPeriodMs}}. handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), @@ -90,8 +91,7 @@ handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. -handle_info(refresh, State) -> - PeriodMs = ?TIME_WINDOW_MS, +handle_info(refresh, #{current_period_ms := PeriodMs} = State) -> Msgs = ?MSGS_LIST, DroppedStats = lists:foldl( fun(Msg, Acc) -> @@ -112,7 +112,11 @@ handle_info(refresh, State) -> Msgs ), maybe_log_dropped(DroppedStats, PeriodMs), - State1 = State#{timer_ref => schedule_refresh(PeriodMs)}, + NewPeriodMs = ?TIME_WINDOW_MS, + State1 = State#{ + timer_ref => schedule_refresh(NewPeriodMs), + current_period_ms => NewPeriodMs + }, {noreply, State1}; handle_info(Info, State) -> ?SLOG(error, #{msg => "unxpected_info", info => Info}), @@ -143,4 +147,5 @@ maybe_log_dropped(_DroppedStats, _PeriodMs) -> ok. schedule_refresh(PeriodMs) -> + ?tp(log_throttler_sched_refresh, #{new_period_ms => PeriodMs}), erlang:send_after(PeriodMs, ?MODULE, refresh). diff --git a/apps/emqx/src/emqx_session_events.erl b/apps/emqx/src/emqx_session_events.erl index 856efac74..ac8dee262 100644 --- a/apps/emqx/src/emqx_session_events.erl +++ b/apps/emqx/src/emqx_session_events.erl @@ -62,10 +62,10 @@ handle_event(ClientInfo, {dropped, Msg, #{reason := queue_full, logctx := Ctx}}) ok = emqx_metrics:inc('delivery.dropped.queue_full'), ok = inc_pd('send_msg.dropped', 1), ok = inc_pd('send_msg.dropped.queue_full', 1), - ?SLOG( - info, + ?SLOG_THROTTLE( + warning, Ctx#{ - msg => "dropped_msg_due_to_mqueue_is_full", + msg => dropped_msg_due_to_mqueue_is_full, payload => Msg#message.payload }, #{topic => Msg#message.topic} diff --git a/apps/emqx/test/emqx_log_throttler_SUITE.erl b/apps/emqx/test/emqx_log_throttler_SUITE.erl index c5689ea24..441ef2d95 100644 --- a/apps/emqx/test/emqx_log_throttler_SUITE.erl +++ b/apps/emqx/test/emqx_log_throttler_SUITE.erl @@ -23,8 +23,10 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --define(THROTTLE_MSG, "test_throttle_msg"). --define(THROTTLE_MSG1, "test_throttle_msg1"). +%% Have to use real msgs, as the schema is guarded by enum. +-define(THROTTLE_MSG, authorization_permission_denied). +-define(THROTTLE_MSG1, cannot_publish_to_topic_due_to_not_authorized). +-define(TIME_WINDOW, <<"1s">>). all() -> emqx_common_test_helpers:all(?MODULE). @@ -39,7 +41,7 @@ init_per_suite(Config) -> #{ log => #{ throttling => #{ - time_window => <<"1s">>, msgs => [?THROTTLE_MSG] + time_window => ?TIME_WINDOW, msgs => [?THROTTLE_MSG] } } } @@ -70,6 +72,10 @@ end_per_testcase(t_throttle_add_new_msg, _Config) -> ok = snabbkaffe:stop(), {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}), ok; +end_per_testcase(t_update_time_window, _Config) -> + ok = snabbkaffe:stop(), + {ok, _} = emqx_conf:update([log, throttling, time_window], ?TIME_WINDOW, #{}), + ok; end_per_testcase(_TC, _Config) -> ok = snabbkaffe:stop(). @@ -87,7 +93,7 @@ t_throttle(_Config) -> lists:seq(1, 100) ), {ok, _} = ?block_until( - #{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG}, 3000 + #{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG}, 5000 ), ?assert(emqx_log_throttler:allow(warning, ?THROTTLE_MSG)), @@ -110,7 +116,7 @@ t_throttle_add_new_msg(_Config) -> ?check_trace( begin ?block_until( - #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 3000 + #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 5000 ), ?assert(emqx_log_throttler:allow(warning, ?THROTTLE_MSG1)), ?assertNot(emqx_log_throttler:allow(warning, ?THROTTLE_MSG1)), @@ -128,11 +134,25 @@ t_throttle_add_new_msg(_Config) -> t_throttle_no_msg(_Config) -> %% Must simply pass with no crashes - ?assert(emqx_log_throttler:allow(warning, "no_test_throttle_msg")), - ?assert(emqx_log_throttler:allow(warning, "no_test_throttle_msg")), + ?assert(emqx_log_throttler:allow(warning, no_test_throttle_msg)), + ?assert(emqx_log_throttler:allow(warning, no_test_throttle_msg)), timer:sleep(10), ?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))). +t_update_time_window(_Config) -> + ?check_trace( + begin + ?wait_async_action( + emqx_conf:update([log, throttling, time_window], <<"2s">>, #{}), + #{?snk_kind := log_throttler_sched_refresh, new_period_ms := 2000}, + 5000 + ), + timer:sleep(10), + ?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))) + end, + [] + ). + %%-------------------------------------------------------------------- %% internal functions %%-------------------------------------------------------------------- diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index f0b0b4a12..19ca74f4e 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -75,6 +75,14 @@ %% 1 million default ports counter -define(DEFAULT_MAX_PORTS, 1024 * 1024). +-define(LOG_THROTTLING_MSGS, [ + authorization_permission_denied, + cannot_publish_to_topic_due_to_not_authorized, + cannot_publish_to_topic_due_to_quota_exceeded, + connection_rejected_due_to_license_limit_reached, + dropped_msg_due_to_mqueue_is_full +]). + %% Callback to upgrade config after loaded from config file but before validation. upgrade_raw_conf(Raw0) -> Raw1 = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(Raw0), @@ -910,7 +918,7 @@ fields("log") -> importance => ?IMPORTANCE_HIGH } )}, - {"throttling", + {throttling, sc(?R_REF("log_throttling"), #{ desc => ?DESC("log_throttling"), importance => ?IMPORTANCE_MEDIUM @@ -1019,22 +1027,22 @@ fields("log_burst_limit") -> ]; fields("log_throttling") -> [ - {"window_time", + {time_window, sc( emqx_schema:duration_s(), #{ default => <<"1m">>, - desc => ?DESC("log_throttling_window_time"), + desc => ?DESC("log_throttling_time_window"), importance => ?IMPORTANCE_MEDIUM } )}, - %% A static list of event ids used in ?SLOG_THROTTLE/3,4 macro. + %% A static list of msgs used in ?SLOG_THROTTLE/2,3 macro. %% For internal (developer) use only. - {"event_ids", + {msgs, sc( - hoconsc:array(atom()), + hoconsc:array(hoconsc:enum(?LOG_THROTTLING_MSGS)), #{ - default => [], + default => ?LOG_THROTTLING_MSGS, importance => ?IMPORTANCE_HIDDEN } )} diff --git a/apps/emqx_license/src/emqx_license.erl b/apps/emqx_license/src/emqx_license.erl index c0fc10b91..fd80cd2c7 100644 --- a/apps/emqx_license/src/emqx_license.erl +++ b/apps/emqx_license/src/emqx_license.erl @@ -85,7 +85,10 @@ check(_ConnInfo, AckProps) -> {ok, #{max_connections := MaxClients}} -> case check_max_clients_exceeded(MaxClients) of true -> - ?SLOG(info, #{msg => "connection_rejected_due_to_license_limit_reached"}), + ?SLOG_THROTTLE( + error, + #{msg => connection_rejected_due_to_license_limit_reached} + ), {stop, {error, ?RC_QUOTA_EXCEEDED}}; false -> {ok, AckProps} diff --git a/rel/i18n/emqx_conf_schema.hocon b/rel/i18n/emqx_conf_schema.hocon index ff975a7c2..889bfafa5 100644 --- a/rel/i18n/emqx_conf_schema.hocon +++ b/rel/i18n/emqx_conf_schema.hocon @@ -482,11 +482,11 @@ desc_log_throttling.desc: """Log throttling feature reduces the number of potentially flooding logged events by dropping all but the first event within a configured time window.""" -log_throttling_window_time.desc: -"""A time interval at which log throttling is applied. Defaults to 1 minute.""" +log_throttling_time_window.desc: +"""For throttled messages, only log 1 in each time window.""" -log_throttling_window_time.label: -"""Log Throttling Window Time""" +log_throttling_time_window.label: +"""Log Throttling Time Window""" cluster_dns_record_type.desc: """DNS record type."""