From 4d0febd38000c2a9d5543d99f04e0f8ea907193c Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 11 Jan 2024 23:14:57 +0100 Subject: [PATCH 1/9] docs: fix schema name-version description --- rel/i18n/emqx_plugins_schema.hocon | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rel/i18n/emqx_plugins_schema.hocon b/rel/i18n/emqx_plugins_schema.hocon index b72c87054..af1fb4a0b 100644 --- a/rel/i18n/emqx_plugins_schema.hocon +++ b/rel/i18n/emqx_plugins_schema.hocon @@ -22,9 +22,9 @@ install_dir.label: """Install Directory""" name_vsn.desc: -"""The {name}-{version} of the plugin.
-It should match the plugin application name-version as the for the plugin release package name
-For example: my_plugin-0.1.0.""" +"""The `{name}-{version}` of the plugin.
+It should match the plugin application name-version as plugin release package name
+For example: `my_plugin-0.1.0`.""" name_vsn.label: """Name-Version""" From e9318752e626553c834257f85045eeea3c98a7a1 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 12 Jan 2024 14:58:37 +0100 Subject: [PATCH 2/9] feat: store session unregistration timestamp in emqx_cm_registry table --- apps/emqx/include/emqx_cm.hrl | 9 +- apps/emqx/src/emqx_cm_registry.erl | 150 ++++++++++++++++++--- apps/emqx/src/emqx_cm_registry_cleaner.erl | 139 +++++++++++++++++++ apps/emqx/src/emqx_cm_sup.erl | 2 + apps/emqx/src/emqx_schema.erl | 74 ++++++---- rel/i18n/emqx_schema.hocon | 18 ++- 6 files changed, 342 insertions(+), 50 deletions(-) create mode 100644 apps/emqx/src/emqx_cm_registry_cleaner.erl diff --git a/apps/emqx/include/emqx_cm.hrl b/apps/emqx/include/emqx_cm.hrl index 6478a6162..d1d195921 100644 --- a/apps/emqx/include/emqx_cm.hrl +++ b/apps/emqx/include/emqx_cm.hrl @@ -23,7 +23,7 @@ -define(CHAN_INFO_TAB, emqx_channel_info). -define(CHAN_LIVE_TAB, emqx_channel_live). -%% Mria/Mnesia Tables for channel management. +%% Mria table for session registraition. -define(CHAN_REG_TAB, emqx_channel_registry). -define(T_KICK, 5_000). @@ -32,4 +32,11 @@ -define(CM_POOL, emqx_cm_pool). +%% Registered sessions. +-record(channel, { + chid :: emqx_types:clientid() | '_', + %% pid field is extended in 5.6.0 to support recording unregistration timestamp. + pid :: pid() | non_neg_integer() | '$1' +}). + -endif. diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index 058bb53ec..0236cbc06 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2019-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -19,14 +19,9 @@ -behaviour(gen_server). --include("emqx.hrl"). --include("emqx_cm.hrl"). --include("logger.hrl"). --include("types.hrl"). - -export([start_link/0]). --export([is_enabled/0]). +-export([is_enabled/0, is_hist_enabled/0]). -export([ register_channel/1, @@ -50,10 +45,13 @@ do_cleanup_channels/1 ]). --define(REGISTRY, ?MODULE). --define(LOCK, {?MODULE, cleanup_down}). +-include("emqx.hrl"). +-include("emqx_cm.hrl"). +-include("logger.hrl"). +-include("types.hrl"). --record(channel, {chid, pid}). +-define(REGISTRY, ?MODULE). +-define(NODE_DOWN_CLEANUP_LOCK, {?MODULE, cleanup_down}). %% @doc Start the global channel registry. -spec start_link() -> startlink_ret(). @@ -69,6 +67,11 @@ start_link() -> is_enabled() -> emqx:get_config([broker, enable_session_registry]). +%% @doc Is the global session registration history enabled? +-spec is_hist_enabled() -> boolean(). +is_hist_enabled() -> + retain_duration() > 0. + %% @doc Register a global channel. -spec register_channel( emqx_types:clientid() @@ -78,8 +81,11 @@ register_channel(ClientId) when is_binary(ClientId) -> register_channel({ClientId, self()}); register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> case is_enabled() of - true -> mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid)); - false -> ok + true -> + ok = when_hist_enabled(fun() -> delete_hist_d(ClientId) end), + mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid)); + false -> + ok end. %% @doc Unregister a global channel. @@ -91,18 +97,45 @@ unregister_channel(ClientId) when is_binary(ClientId) -> unregister_channel({ClientId, self()}); unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> case is_enabled() of - true -> mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid)); - false -> ok + true -> + mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid)), + %% insert unregistration history after unrestration + ok = when_hist_enabled(fun() -> insert_hist_d(ClientId) end); + false -> + ok end. %% @doc Lookup the global channels. -spec lookup_channels(emqx_types:clientid()) -> list(pid()). lookup_channels(ClientId) -> - [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(?CHAN_REG_TAB, ClientId)]. + lists:filtermap( + fun + (#channel{pid = ChanPid}) when is_pid(ChanPid) -> + case is_pid_down(ChanPid) of + true -> + false; + _ -> + {true, ChanPid} + end; + (_) -> + false + end, + mnesia:dirty_read(?CHAN_REG_TAB, ClientId) + ). + +%% Return 'true' or 'false' if it's a local pid. +%% Otherwise return 'unknown'. +is_pid_down(Pid) when node(Pid) =:= node() -> + not erlang:is_process_alive(Pid); +is_pid_down(_) -> + unknown. record(ClientId, ChanPid) -> #channel{chid = ClientId, pid = ChanPid}. +hist(ClientId) -> + #channel{chid = ClientId, pid = now_ts()}. + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -158,15 +191,96 @@ code_change(_OldVsn, State, _Extra) -> cleanup_channels(Node) -> global:trans( - {?LOCK, self()}, + {?NODE_DOWN_CLEANUP_LOCK, self()}, fun() -> mria:transaction(?CM_SHARD, fun ?MODULE:do_cleanup_channels/1, [Node]) end ). do_cleanup_channels(Node) -> - Pat = [{#channel{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}], + Pat = [ + { + #channel{pid = '$1', _ = '_'}, + _Match = [{'andalso', {is_pid, '$1'}, {'==', {node, '$1'}, Node}}], + _Return = ['$_'] + } + ], lists:foreach(fun delete_channel/1, mnesia:select(?CHAN_REG_TAB, Pat, write)). delete_channel(Chan) -> - mnesia:delete_object(?CHAN_REG_TAB, Chan, write). + mnesia:delete_object(?CHAN_REG_TAB, Chan, write), + ok = when_hist_enabled(fun() -> insert_hist_t(Chan#channel.chid) end). + +%%-------------------------------------------------------------------- +%% History entry operations +%%-------------------------------------------------------------------- + +when_hist_enabled(F) -> + case is_hist_enabled() of + true -> + _ = F(); + false -> + ok + end, + ok. + +%% Insert unregistration history in a transaction when unregistering the last channel for a clientid. +insert_hist_t(ClientId) -> + case delete_hist_t(ClientId) of + true -> + ok; + false -> + mnesia:write(?CHAN_REG_TAB, hist(ClientId), write) + end. + +%% Dirty insert unregistration history. +%% Since dirty opts are used, async pool workers may race deletes and inserts, +%% so there could be more than one history records for a clientid, +%% but it should be eventually consistent after the client re-registers or the periodic cleanup. +insert_hist_d(ClientId) -> + %% delete old hist records first + case delete_hist_d(ClientId) of + true -> + ok; + false -> + mria:dirty_write(?CHAN_REG_TAB, hist(ClientId)) + end. + +%% Current timestamp in seconds. +now_ts() -> + erlang:system_time(seconds). + +%% Delete all history records for a clientid, return true if there is a Pid found. +delete_hist_t(ClientId) -> + fold_hist( + fun(Hist) -> mnesia:delete_object(?CHAN_REG_TAB, Hist, write) end, + mnesia:read(?CHAN_REG_TAB, ClientId, write) + ). + +%% Delete all history records for a clientid, return true if there is a Pid found. +delete_hist_d(ClientId) -> + fold_hist( + fun(Hist) -> mria:dirty_delete_object(?CHAN_REG_TAB, Hist) end, + mnesia:dirty_read(?CHAN_REG_TAB, ClientId) + ). + +%% Fold over the history records, return true if there is a Pid found. +fold_hist(F, List) -> + lists:foldl( + fun(#channel{pid = Ts} = Record, HasPid) -> + case is_integer(Ts) of + true -> + ok = F(Record), + HasPid; + false -> + true + end + end, + false, + List + ). + +%% Return the session registration history retain duration. +-spec retain_duration() -> non_neg_integer(). +retain_duration() -> + emqx:get_config([broker, session_registration_history_retain]). diff --git a/apps/emqx/src/emqx_cm_registry_cleaner.erl b/apps/emqx/src/emqx_cm_registry_cleaner.erl new file mode 100644 index 000000000..41f5bfc6b --- /dev/null +++ b/apps/emqx/src/emqx_cm_registry_cleaner.erl @@ -0,0 +1,139 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc This module implements the global session registry history cleaner. +-module(emqx_cm_registry_cleaner). +-behaviour(gen_server). + +-export([start_link/0]). + +%% gen_server callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +-include("emqx_cm.hrl"). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init(_) -> + case mria_config:whoami() =:= core of + true -> + ok = send_delay_start(), + {ok, #{next_clientid => undefined}}; + false -> + ignore + end. + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(start, #{next_clientid := NextClientId} = State) -> + case is_hist_enabled() of + true -> + NewNext = + case cleanup_one_chunk(NextClientId) of + '$end_of_table' -> + ok = send_delay_start(), + undefined; + Id -> + _ = erlang:garbage_collect(), + Id + end, + {noreply, State#{next_clientid := NewNext}}; + false -> + %% if not enabled, dealy and check again + %% because it might be enabled from online config change while waiting + ok = send_delay_start(), + {noreply, State} + end; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +cleanup_one_chunk(NextClientId) -> + Retain = retain_duration(), + Now = now_ts(), + IsExpired = fun(#channel{pid = Ts}) -> + is_integer(Ts) andalso (Ts < Now - Retain) + end, + cleanup_loop(NextClientId, 10000, IsExpired). + +cleanup_loop(ClientId, 0, _IsExpired) -> + ClientId; +cleanup_loop('$end_of_table', _Count, _IsExpired) -> + '$end_of_table'; +cleanup_loop(undefined, Count, IsExpired) -> + cleanup_loop(mnesia:dirty_first(?CHAN_REG_TAB), Count, IsExpired); +cleanup_loop(ClientId, Count, IsExpired) -> + Recods = mnesia:dirty_read(?CHAN_REG_TAB, ClientId), + Next = mnesia:dirty_next(?CHAN_REG_TAB, ClientId), + lists:foreach( + fun(R) -> + case IsExpired(R) of + true -> + mria:dirty_delete_object(?CHAN_REG_TAB, R); + false -> + ok + end + end, + Recods + ), + cleanup_loop(Next, Count - 1, IsExpired). + +is_hist_enabled() -> + retain_duration() > 0. + +%% Return the session registration history retain duration in seconds. +-spec retain_duration() -> non_neg_integer(). +retain_duration() -> + emqx:get_config([broker, session_registration_history_retain]). + +cleanup_delay() -> + Default = timer:minutes(2), + case retain_duration() of + 0 -> + %% prepare for online config change + Default; + RetainSeconds -> + Min = max(1, timer:seconds(RetainSeconds div 4)), + min(Min, Default) + end. + +send_delay_start() -> + Delay = cleanup_delay(), + ok = send_delay_start(Delay). + +send_delay_start(Delay) -> + _ = erlang:send_after(Delay, self(), start), + ok. + +now_ts() -> + erlang:system_time(seconds). diff --git a/apps/emqx/src/emqx_cm_sup.erl b/apps/emqx/src/emqx_cm_sup.erl index 622921f1d..348c3fca0 100644 --- a/apps/emqx/src/emqx_cm_sup.erl +++ b/apps/emqx/src/emqx_cm_sup.erl @@ -49,6 +49,7 @@ init([]) -> Locker = child_spec(emqx_cm_locker, 5000, worker), CmPool = emqx_pool_sup:spec(emqx_cm_pool_sup, [?CM_POOL, random, {emqx_pool, start_link, []}]), Registry = child_spec(emqx_cm_registry, 5000, worker), + RegistryCleaner = child_spec(emqx_cm_registry_cleaner, 5000, worker), Manager = child_spec(emqx_cm, 5000, worker), DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor), Children = @@ -58,6 +59,7 @@ init([]) -> Locker, CmPool, Registry, + RegistryCleaner, Manager, DSSessionGCSup ], diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index d9171f711..ec4edfb6b 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -182,7 +182,7 @@ -define(DEFAULT_MULTIPLIER, 1.5). -define(DEFAULT_BACKOFF, 0.75). -namespace() -> broker. +namespace() -> emqx. tags() -> [<<"EMQX">>]. @@ -230,7 +230,7 @@ roots(high) -> ); roots(medium) -> [ - {"broker", + {broker, sc( ref("broker"), #{ @@ -1347,24 +1347,43 @@ fields("deflate_opts") -> ]; fields("broker") -> [ - {"enable_session_registry", + {enable_session_registry, sc( boolean(), #{ default => true, + importance => ?IMPORTANCE_HIGH, desc => ?DESC(broker_enable_session_registry) } )}, - {"session_locking_strategy", + {session_registration_history_retain, + sc( + duration_s(), + #{ + default => <<"0s">>, + importance => ?IMPORTANCE_LOW, + desc => ?DESC("broker_session_registration_history_retain") + } + )}, + {session_locking_strategy, sc( hoconsc:enum([local, leader, quorum, all]), #{ default => quorum, + importance => ?IMPORTANCE_HIDDEN, desc => ?DESC(broker_session_locking_strategy) } )}, - shared_subscription_strategy(), - {"shared_dispatch_ack_enabled", + %% moved to under mqtt root + {shared_subscription_strategy, + sc( + string(), + #{ + deprecated => {since, "5.1.0"}, + importance => ?IMPORTANCE_HIDDEN + } + )}, + {shared_dispatch_ack_enabled, sc( boolean(), #{ @@ -1374,7 +1393,7 @@ fields("broker") -> desc => ?DESC(broker_shared_dispatch_ack_enabled) } )}, - {"route_batch_clean", + {route_batch_clean, sc( boolean(), #{ @@ -1383,18 +1402,18 @@ fields("broker") -> importance => ?IMPORTANCE_HIDDEN } )}, - {"perf", + {perf, sc( ref("broker_perf"), #{importance => ?IMPORTANCE_HIDDEN} )}, - {"routing", + {routing, sc( ref("broker_routing"), #{importance => ?IMPORTANCE_HIDDEN} )}, %% FIXME: Need new design for shared subscription group - {"shared_subscription_group", + {shared_subscription_group, sc( map(name, ref("shared_subscription_group")), #{ @@ -3640,7 +3659,22 @@ mqtt_general() -> desc => ?DESC(mqtt_shared_subscription) } )}, - shared_subscription_strategy(), + {"shared_subscription_strategy", + sc( + hoconsc:enum([ + random, + round_robin, + round_robin_per_group, + sticky, + local, + hash_topic, + hash_clientid + ]), + #{ + default => round_robin, + desc => ?DESC(mqtt_shared_subscription_strategy) + } + )}, {"exclusive_subscription", sc( boolean(), @@ -3846,24 +3880,6 @@ mqtt_session() -> )} ]. -shared_subscription_strategy() -> - {"shared_subscription_strategy", - sc( - hoconsc:enum([ - random, - round_robin, - round_robin_per_group, - sticky, - local, - hash_topic, - hash_clientid - ]), - #{ - default => round_robin, - desc => ?DESC(broker_shared_subscription_strategy) - } - )}. - default_mem_check_interval() -> case emqx_os_mon:is_os_check_supported() of true -> <<"60s">>; diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 40da8d75e..94ba275f3 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1022,7 +1022,7 @@ fields_ws_opts_supported_subprotocols.desc: fields_ws_opts_supported_subprotocols.label: """Supported subprotocols""" -broker_shared_subscription_strategy.desc: +mqtt_shared_subscription_strategy.desc: """Dispatch strategy for shared subscription. - `random`: Randomly select a subscriber for dispatch; - `round_robin`: Messages from a single publisher are dispatched to subscribers in turn; @@ -1420,7 +1420,21 @@ force_shutdown_enable.label: """Enable `force_shutdown` feature""" broker_enable_session_registry.desc: -"""Enable session registry""" +"""The Global Session Registry is a cluster-wide mechanism designed to maintain the uniqueness of client IDs within the cluster. +Recommendations for Use
+- Default Setting: It is generally advisable to enable. This feature is crucial for session takeover to work properly. For example if a client reconneted to another node in the cluster, the new connection will need to find the old session and take it over. +- Disabling the Feature: Disabling is an option for scenarios when all sessions expire immediately after client is disconnected (i.e. session expiry interval is zero). This can be relevant in certain specialized use cases. + +Advantages of Disabling
+- Reduced Memory Usage: Turning off the session registry can lower the overall memory footprint of the system. +- Improved Performance: Without the overhead of maintaining a global registry, the node can process client connections faster.""" + +broker_session_registration_history_retain.desc: +"""The duration to retain the session registration history. Setting this to a value greater than `0s` will increase memory usage and impact peformance. +This retained history can be used to monitor how many sessions were registered in the past configured duration. +Note: This config has no effect if `enable_session_registry` is set to `false`.
+Note: If the clients are suing random client IDs, it's not recommended to enable this feature, at least not for a long retain duration.
+Note: When clustered, the lowest (but greater than `0s`) value among the nodes in the cluster will take effect.""" overload_protection_backoff_delay.desc: """The maximum duration of delay for background task execution during high load conditions.""" From 562a2736ae243b23760023186bb5be7aad5e61a2 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 15 Jan 2024 15:58:30 +0100 Subject: [PATCH 3/9] feat: add `broker` root to hot-config schema --- apps/emqx_management/src/emqx_mgmt_api_configs.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl index ca1a9fc0b..8db1ef720 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl @@ -53,7 +53,8 @@ <<"alarm">>, <<"sys_topics">>, <<"sysmon">>, - <<"log">> + <<"log">>, + <<"broker">> | ?ROOT_KEYS_EE ]). From 509ab6f35a44182deefb6c60339cc923b314729d Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 16 Jan 2024 14:19:31 +0100 Subject: [PATCH 4/9] feat(api): add /sessions_count api to count sessions --- apps/emqx/src/emqx_cm_registry.erl | 2 +- ...leaner.erl => emqx_cm_registry_keeper.erl} | 61 +++++++++++++++++-- apps/emqx/src/emqx_cm_sup.erl | 4 +- apps/emqx/src/emqx_schema.erl | 4 +- .../src/emqx_mgmt_api_clients.erl | 35 ++++++++++- rel/i18n/emqx_mgmt_api_clients.hocon | 7 +++ rel/i18n/emqx_schema.hocon | 2 +- 7 files changed, 103 insertions(+), 12 deletions(-) rename apps/emqx/src/{emqx_cm_registry_cleaner.erl => emqx_cm_registry_keeper.erl} (65%) diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index 0236cbc06..1fd140388 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -283,4 +283,4 @@ fold_hist(F, List) -> %% Return the session registration history retain duration. -spec retain_duration() -> non_neg_integer(). retain_duration() -> - emqx:get_config([broker, session_registration_history_retain]). + emqx:get_config([broker, session_history_retain]). diff --git a/apps/emqx/src/emqx_cm_registry_cleaner.erl b/apps/emqx/src/emqx_cm_registry_keeper.erl similarity index 65% rename from apps/emqx/src/emqx_cm_registry_cleaner.erl rename to apps/emqx/src/emqx_cm_registry_keeper.erl index 41f5bfc6b..8d697732a 100644 --- a/apps/emqx/src/emqx_cm_registry_cleaner.erl +++ b/apps/emqx/src/emqx_cm_registry_keeper.erl @@ -15,10 +15,13 @@ %%-------------------------------------------------------------------- %% @doc This module implements the global session registry history cleaner. --module(emqx_cm_registry_cleaner). +-module(emqx_cm_registry_keeper). -behaviour(gen_server). --export([start_link/0]). +-export([ + start_link/0, + count/1 +]). %% gen_server callbacks -export([ @@ -30,8 +33,15 @@ code_change/3 ]). +-include_lib("stdlib/include/ms_transform.hrl"). -include("emqx_cm.hrl"). +-define(CACHE_COUNT_THRESHOLD, 1000). +-define(MIN_COUNT_INTERVAL_SECONDS, 5). +-define(CLEANUP_CHUNK_SIZE, 10000). + +-define(IS_HIST_ENABLED(RETAIN), (RETAIN > 0)). + start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -44,6 +54,45 @@ init(_) -> ignore end. +%% @doc Count the number of sessions. +%% Include sessions which are expired since the given timestamp if `since' is greater than 0. +-spec count(non_neg_integer()) -> non_neg_integer(). +count(Since) -> + Retain = retain_duration(), + Now = now_ts(), + %% Get table size if hist is not enabled or + %% Since is before the earliest possible retention time. + IsCountAll = (not ?IS_HIST_ENABLED(Retain) orelse (Now - Retain >= Since)), + case IsCountAll of + true -> + mnesia:table_info(?CHAN_REG_TAB, size); + false -> + %% make a gen call to avoid many callers doing the same concurrently + gen_server:call(?MODULE, {count, Since}, infinity) + end. + +handle_call({count, Since}, _From, State) -> + {LastCountTime, LastCount} = + case State of + #{last_count_time := T, last_count := C} -> + {T, C}; + _ -> + {0, 0} + end, + Now = now_ts(), + Total = mnesia:table_info(?CHAN_REG_TAB, size), + %% Always count if the table is small enough + %% or when the last count is too old + IsTableSmall = (Total < ?CACHE_COUNT_THRESHOLD), + IsLastCountOld = (Now - LastCountTime > ?MIN_COUNT_INTERVAL_SECONDS), + case IsTableSmall orelse IsLastCountOld of + true -> + Count = do_count(Since), + CountFinishedAt = now_ts(), + {reply, Count, State#{last_count_time => CountFinishedAt, last_count => Count}}; + false -> + {reply, LastCount, State} + end; handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -84,7 +133,7 @@ cleanup_one_chunk(NextClientId) -> IsExpired = fun(#channel{pid = Ts}) -> is_integer(Ts) andalso (Ts < Now - Retain) end, - cleanup_loop(NextClientId, 10000, IsExpired). + cleanup_loop(NextClientId, ?CLEANUP_CHUNK_SIZE, IsExpired). cleanup_loop(ClientId, 0, _IsExpired) -> ClientId; @@ -114,7 +163,7 @@ is_hist_enabled() -> %% Return the session registration history retain duration in seconds. -spec retain_duration() -> non_neg_integer(). retain_duration() -> - emqx:get_config([broker, session_registration_history_retain]). + emqx:get_config([broker, session_history_retain]). cleanup_delay() -> Default = timer:minutes(2), @@ -137,3 +186,7 @@ send_delay_start(Delay) -> now_ts() -> erlang:system_time(seconds). + +do_count(Since) -> + Ms = ets:fun2ms(fun(#channel{pid = V}) -> is_pid(V) orelse (is_integer(V) andalso (V >= Since)) end), + ets:select_count(?CHAN_REG_TAB, Ms). diff --git a/apps/emqx/src/emqx_cm_sup.erl b/apps/emqx/src/emqx_cm_sup.erl index 348c3fca0..3306b7ccd 100644 --- a/apps/emqx/src/emqx_cm_sup.erl +++ b/apps/emqx/src/emqx_cm_sup.erl @@ -49,7 +49,7 @@ init([]) -> Locker = child_spec(emqx_cm_locker, 5000, worker), CmPool = emqx_pool_sup:spec(emqx_cm_pool_sup, [?CM_POOL, random, {emqx_pool, start_link, []}]), Registry = child_spec(emqx_cm_registry, 5000, worker), - RegistryCleaner = child_spec(emqx_cm_registry_cleaner, 5000, worker), + RegistryKeeper = child_spec(emqx_cm_registry_keeper, 5000, worker), Manager = child_spec(emqx_cm, 5000, worker), DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor), Children = @@ -59,7 +59,7 @@ init([]) -> Locker, CmPool, Registry, - RegistryCleaner, + RegistryKeeper, Manager, DSSessionGCSup ], diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index ec4edfb6b..d5989687d 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1356,13 +1356,13 @@ fields("broker") -> desc => ?DESC(broker_enable_session_registry) } )}, - {session_registration_history_retain, + {session_history_retain, sc( duration_s(), #{ default => <<"0s">>, importance => ?IMPORTANCE_LOW, - desc => ?DESC("broker_session_registration_history_retain") + desc => ?DESC("broker_session_history_retain") } )}, {session_locking_strategy, diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index f394ffefa..935c690fe 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -45,7 +45,8 @@ subscribe_batch/2, unsubscribe/2, unsubscribe_batch/2, - set_keepalive/2 + set_keepalive/2, + sessions_count/2 ]). -export([ @@ -96,7 +97,8 @@ paths() -> "/clients/:clientid/subscribe/bulk", "/clients/:clientid/unsubscribe", "/clients/:clientid/unsubscribe/bulk", - "/clients/:clientid/keepalive" + "/clients/:clientid/keepalive", + "/sessions_count" ]. schema("/clients") -> @@ -385,6 +387,30 @@ schema("/clients/:clientid/keepalive") -> ) } } + }; +schema("/sessions_count") -> + #{ + 'operationId' => sessions_count, + get => #{ + description => ?DESC(get_sessions_count), + tags => ?TAGS, + parameters => [ + {since, + hoconsc:mk(non_neg_integer(), #{ + in => query, + required => false, + default => 0, + desc => + <<"Include sessions expired after this time (UNIX Epoch in seconds precesion)">>, + example => 1705391625 + })} + ], + responses => #{ + 200 => hoconsc:mk(binary(), #{ + desc => <<"Number of sessions">> + }) + } + } }. fields(clients) -> @@ -1059,3 +1085,8 @@ client_example() -> <<"recv_cnt">> => 4, <<"recv_msg.qos0">> => 0 }. + +sessions_count(get, #{query_string := QString}) -> + Since = maps:get(<<"since">>, QString, undefined), + Count = emqx_cm_registry_keeper:count(Since), + {200, integer_to_binary(Count)}. diff --git a/rel/i18n/emqx_mgmt_api_clients.hocon b/rel/i18n/emqx_mgmt_api_clients.hocon index 64d4e5279..1e9193df6 100644 --- a/rel/i18n/emqx_mgmt_api_clients.hocon +++ b/rel/i18n/emqx_mgmt_api_clients.hocon @@ -60,4 +60,11 @@ set_keepalive_seconds.desc: set_keepalive_seconds.label: """Set the online client keepalive by seconds""" +get_sessions_count.desc: +"""Get the number of sessions. By default it returns the number of non-expired sessions. +if `broker.session_history_retain` is set to a duration greater than `0s`, +this API can also count expired sessions by providing the `since` parameter.""" +get_sessions_count.label: +"""Count number of sessions""" + } diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 94ba275f3..4c9f1b83e 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1429,7 +1429,7 @@ Advantages of Disabling
- Reduced Memory Usage: Turning off the session registry can lower the overall memory footprint of the system. - Improved Performance: Without the overhead of maintaining a global registry, the node can process client connections faster.""" -broker_session_registration_history_retain.desc: +broker_session_history_retain.desc: """The duration to retain the session registration history. Setting this to a value greater than `0s` will increase memory usage and impact peformance. This retained history can be used to monitor how many sessions were registered in the past configured duration. Note: This config has no effect if `enable_session_registry` is set to `false`.
From 87a2368e3797bc98697298cf0b8f599e84507967 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 19 Jan 2024 17:31:16 +0100 Subject: [PATCH 5/9] feat(metrics): add cluster_session guage --- apps/emqx/src/emqx_cm.erl | 3 ++- apps/emqx/src/emqx_cm_registry.erl | 2 +- apps/emqx/src/emqx_cm_registry_keeper.erl | 6 ++++-- apps/emqx/src/emqx_stats.erl | 8 +++++++- apps/emqx_dashboard/src/emqx_dashboard_monitor.erl | 1 + .../src/emqx_dashboard_monitor_api.erl | 6 ++++++ apps/emqx_management/src/emqx_mgmt.erl | 1 + apps/emqx_management/src/emqx_mgmt_api_nodes.erl | 13 +++++++++++++ apps/emqx_management/src/emqx_mgmt_api_stats.erl | 4 ++++ apps/emqx_prometheus/src/emqx_prometheus.erl | 6 +++++- apps/emqx_telemetry/src/emqx_telemetry.app.src | 2 +- apps/emqx_telemetry/src/emqx_telemetry.erl | 4 ++++ rel/i18n/emqx_mgmt_api_clients.hocon | 9 ++++++--- 13 files changed, 55 insertions(+), 10 deletions(-) diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 10cd3d6cc..c33aacf30 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -124,7 +124,8 @@ {?CHAN_TAB, 'channels.count', 'channels.max'}, {?CHAN_TAB, 'sessions.count', 'sessions.max'}, {?CHAN_CONN_TAB, 'connections.count', 'connections.max'}, - {?CHAN_LIVE_TAB, 'live_connections.count', 'live_connections.max'} + {?CHAN_LIVE_TAB, 'live_connections.count', 'live_connections.max'}, + {?CHAN_REG_TAB, 'cluster_sessions.count', 'cluster_sessions.max'} ]). %% Batch drain diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index 1fd140388..4556bce0e 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -99,7 +99,7 @@ unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid case is_enabled() of true -> mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid)), - %% insert unregistration history after unrestration + %% insert unregistration history after unregstration ok = when_hist_enabled(fun() -> insert_hist_d(ClientId) end); false -> ok diff --git a/apps/emqx/src/emqx_cm_registry_keeper.erl b/apps/emqx/src/emqx_cm_registry_keeper.erl index 8d697732a..f661e203f 100644 --- a/apps/emqx/src/emqx_cm_registry_keeper.erl +++ b/apps/emqx/src/emqx_cm_registry_keeper.erl @@ -172,7 +172,7 @@ cleanup_delay() -> %% prepare for online config change Default; RetainSeconds -> - Min = max(1, timer:seconds(RetainSeconds div 4)), + Min = max(timer:seconds(1), timer:seconds(RetainSeconds) div 4), min(Min, Default) end. @@ -188,5 +188,7 @@ now_ts() -> erlang:system_time(seconds). do_count(Since) -> - Ms = ets:fun2ms(fun(#channel{pid = V}) -> is_pid(V) orelse (is_integer(V) andalso (V >= Since)) end), + Ms = ets:fun2ms(fun(#channel{pid = V}) -> + is_pid(V) orelse (is_integer(V) andalso (V >= Since)) + end), ets:select_count(?CHAN_REG_TAB, Ms). diff --git a/apps/emqx/src/emqx_stats.erl b/apps/emqx/src/emqx_stats.erl index 9685823ff..9b5e2a826 100644 --- a/apps/emqx/src/emqx_stats.erl +++ b/apps/emqx/src/emqx_stats.erl @@ -99,7 +99,11 @@ [ 'sessions.count', %% Maximum Number of Concurrent Sessions - 'sessions.max' + 'sessions.max', + %% Count of Sessions in the cluster + 'cluster_sessions.count', + %% Maximum Number of Sessions in the cluster + 'cluster_sessions.max' ] ). @@ -164,6 +168,8 @@ names() -> emqx_connections_max, emqx_live_connections_count, emqx_live_connections_max, + emqx_cluster_sessions_count, + emqx_cluster_sessions_max, emqx_sessions_count, emqx_sessions_max, emqx_channels_count, diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index 4891b5293..c8f92de0d 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -415,6 +415,7 @@ getstats(Key) -> stats(connections) -> emqx_stats:getstat('connections.count'); stats(live_connections) -> emqx_stats:getstat('live_connections.count'); +stats(cluster_sessions) -> emqx_stats:getstat('cluster_sessions.count'); stats(topics) -> emqx_stats:getstat('topics.count'); stats(subscriptions) -> emqx_stats:getstat('subscriptions.count'); stats(received) -> emqx_metrics:val('messages.received'); diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index d7e3c094c..c36c6d0f3 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -194,6 +194,12 @@ swagger_desc(live_connections) -> "Connections at the time of sampling." " Can only represent the approximate state" >>; +swagger_desc(cluster_sessions) -> + << + "Total number of sessions in the cluster at the time of sampling. " + "It includes expired sessions when `broker.session_history_retain` is set to a duration greater than `0s`. " + "Can only represent the approximate state" + >>; swagger_desc(received_msg_rate) -> swagger_desc_format("Dropped messages ", per); %swagger_desc(received_bytes_rate) -> swagger_desc_format("Received bytes ", per); diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 9d4ad8521..67405af05 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -145,6 +145,7 @@ node_info() -> ), connections => ets:info(?CHAN_TAB, size), live_connections => ets:info(?CHAN_LIVE_TAB, size), + cluster_sessions => ets:info(?CHAN_REG_TAB, size), node_status => 'running', uptime => proplists:get_value(uptime, BrokerInfo), version => iolist_to_binary(proplists:get_value(version, BrokerInfo)), diff --git a/apps/emqx_management/src/emqx_mgmt_api_nodes.erl b/apps/emqx_management/src/emqx_mgmt_api_nodes.erl index 9afb74f38..07d775f6e 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_nodes.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_nodes.erl @@ -160,6 +160,19 @@ fields(node_info) -> non_neg_integer(), #{desc => <<"Number of clients currently connected to this node">>, example => 0} )}, + {cluster_sessions, + mk( + non_neg_integer(), + #{ + desc => + << + "By default, it includes only those sessions that have not expired. " + "If the `broker.session_history_retain` config is set to a duration greater than `0s`, " + "this count will also include sessions that expired within the specified retain time" + >>, + example => 0 + } + )}, {load1, mk( float(), diff --git a/apps/emqx_management/src/emqx_mgmt_api_stats.erl b/apps/emqx_management/src/emqx_mgmt_api_stats.erl index b57565671..cddc2a7c3 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_stats.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_stats.erl @@ -89,6 +89,10 @@ fields(node_stats_data) -> stats_schema('delayed.max', <<"Historical maximum number of delayed messages">>), stats_schema('live_connections.count', <<"Number of current live connections">>), stats_schema('live_connections.max', <<"Historical maximum number of live connections">>), + stats_schema('cluster_sessions.count', <<"Number of sessions in the cluster">>), + stats_schema( + 'cluster_sessions.max', <<"Historical maximum number of sessions in the cluster">> + ), stats_schema('retained.count', <<"Number of currently retained messages">>), stats_schema('retained.max', <<"Historical maximum number of retained messages">>), stats_schema('sessions.count', <<"Number of current sessions">>), diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 59241bd02..2942ac485 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -251,7 +251,7 @@ add_collect_family(Name, Data, Callback, Type) -> %% behaviour fetch_from_local_node(Mode) -> - {node(self()), #{ + {node(), #{ stats_data => stats_data(Mode), vm_data => vm_data(Mode), cluster_data => cluster_data(Mode), @@ -308,6 +308,8 @@ emqx_collect(K = emqx_sessions_count, D) -> gauge_metrics(?MG(K, D)); emqx_collect(K = emqx_sessions_max, D) -> gauge_metrics(?MG(K, D)); emqx_collect(K = emqx_channels_count, D) -> gauge_metrics(?MG(K, D)); emqx_collect(K = emqx_channels_max, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_cluster_sessions_count, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_cluster_sessions_max, D) -> gauge_metrics(?MG(K, D)); %% pub/sub stats emqx_collect(K = emqx_topics_count, D) -> gauge_metrics(?MG(K, D)); emqx_collect(K = emqx_topics_max, D) -> gauge_metrics(?MG(K, D)); @@ -500,6 +502,8 @@ stats_metric_meta() -> {emqx_sessions_max, gauge, 'sessions.max'}, {emqx_channels_count, gauge, 'channels.count'}, {emqx_channels_max, gauge, 'channels.max'}, + {emqx_cluster_sessions_count, gauge, 'cluster_sessions.count'}, + {emqx_cluster_sessions_max, gauge, 'cluster_sessions.max'}, %% pub/sub stats {emqx_suboptions_count, gauge, 'suboptions.count'}, {emqx_suboptions_max, gauge, 'suboptions.max'}, diff --git a/apps/emqx_telemetry/src/emqx_telemetry.app.src b/apps/emqx_telemetry/src/emqx_telemetry.app.src index 32c2baa91..b4b0ebcfe 100644 --- a/apps/emqx_telemetry/src/emqx_telemetry.app.src +++ b/apps/emqx_telemetry/src/emqx_telemetry.app.src @@ -1,6 +1,6 @@ {application, emqx_telemetry, [ {description, "Report telemetry data for EMQX Opensource edition"}, - {vsn, "0.1.3"}, + {vsn, "0.2.0"}, {registered, [emqx_telemetry_sup, emqx_telemetry]}, {mod, {emqx_telemetry_app, []}}, {applications, [ diff --git a/apps/emqx_telemetry/src/emqx_telemetry.erl b/apps/emqx_telemetry/src/emqx_telemetry.erl index 8842d7a86..2e54549b8 100644 --- a/apps/emqx_telemetry/src/emqx_telemetry.erl +++ b/apps/emqx_telemetry/src/emqx_telemetry.erl @@ -303,6 +303,9 @@ active_plugins() -> num_clients() -> emqx_stats:getstat('live_connections.count'). +num_cluster_sessions() -> + emqx_stats:getstat('cluster_sessions.count'). + messages_sent() -> emqx_metrics:val('messages.sent'). @@ -348,6 +351,7 @@ get_telemetry(State0 = #state{node_uuid = NodeUUID, cluster_uuid = ClusterUUID}) {nodes_uuid, nodes_uuid()}, {active_plugins, active_plugins()}, {num_clients, num_clients()}, + {num_cluster_sessions, num_cluster_sessions()}, {messages_received, messages_received()}, {messages_sent, messages_sent()}, {build_info, build_info()}, diff --git a/rel/i18n/emqx_mgmt_api_clients.hocon b/rel/i18n/emqx_mgmt_api_clients.hocon index 1e9193df6..2431c09ec 100644 --- a/rel/i18n/emqx_mgmt_api_clients.hocon +++ b/rel/i18n/emqx_mgmt_api_clients.hocon @@ -61,9 +61,12 @@ set_keepalive_seconds.label: """Set the online client keepalive by seconds""" get_sessions_count.desc: -"""Get the number of sessions. By default it returns the number of non-expired sessions. -if `broker.session_history_retain` is set to a duration greater than `0s`, -this API can also count expired sessions by providing the `since` parameter.""" +"""Get the total number of sessions in the cluster. +By default, it includes only those sessions that have not expired. +If the `broker.session_history_retain` config is set to a duration greater than 0s, +this count will also include sessions that expired within the specified retain time. +By specifying the `since` parameter, it can return the number of sessions that have expired within the specified time.""" + get_sessions_count.label: """Count number of sessions""" From 330baa6cc39bffca1159e5eb05a834f03d757de9 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 19 Jan 2024 17:44:49 +0100 Subject: [PATCH 6/9] docs: add changelog for #12326 --- changes/ce/feat-12326.en.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 changes/ce/feat-12326.en.md diff --git a/changes/ce/feat-12326.en.md b/changes/ce/feat-12326.en.md new file mode 100644 index 000000000..bfef51eb8 --- /dev/null +++ b/changes/ce/feat-12326.en.md @@ -0,0 +1,14 @@ +Add session registration history. + +Setting config `broker.session_history_retain` allows EMQX to keep track of expired sessions for the retained period. + +API `GET /api/v5/sessions_count?since=1705682238` can be called to count the cluster-wide sessions which were alive (unexpired) since the provided timestamp (UNIX epoch at seconds precision). + +A new gauge `cluster_sessions` is added to the metrics collection. Exposed to prometheus as + +``` +# TYPE emqx_cluster_sessions_count gauge +emqx_cluster_sessions_count 1234 +``` + +The counter can only be used for an approximate estimation as the collection and calculations are async. From 209331ad33bf05d2ad9c0abee9211109daa47962 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 19 Jan 2024 20:13:13 +0100 Subject: [PATCH 7/9] test: fix config test --- apps/emqx/test/emqx_config_SUITE.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/apps/emqx/test/emqx_config_SUITE.erl b/apps/emqx/test/emqx_config_SUITE.erl index 4ddebd278..72611c3a6 100644 --- a/apps/emqx/test/emqx_config_SUITE.erl +++ b/apps/emqx/test/emqx_config_SUITE.erl @@ -76,8 +76,7 @@ t_fill_default_values(C) when is_list(C) -> <<"trie_compaction">> := true }, <<"route_batch_clean">> := false, - <<"session_locking_strategy">> := <<"quorum">>, - <<"shared_subscription_strategy">> := <<"round_robin">> + <<"session_history_retain">> := <<"0s">> } }, WithDefaults From 38047108a4cea20d512c32a851259372c19aa0fc Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 19 Jan 2024 21:27:43 +0100 Subject: [PATCH 8/9] test: add test coverage for emqx_cm_registry_keeper module --- apps/emqx/src/emqx_cm_registry_keeper.erl | 8 +- .../test/emqx_cm_registry_keeper_SUITE.erl | 100 ++++++++++++++++++ 2 files changed, 104 insertions(+), 4 deletions(-) create mode 100644 apps/emqx/test/emqx_cm_registry_keeper_SUITE.erl diff --git a/apps/emqx/src/emqx_cm_registry_keeper.erl b/apps/emqx/src/emqx_cm_registry_keeper.erl index f661e203f..1087932df 100644 --- a/apps/emqx/src/emqx_cm_registry_keeper.erl +++ b/apps/emqx/src/emqx_cm_registry_keeper.erl @@ -46,12 +46,12 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). init(_) -> - case mria_config:whoami() =:= core of + case mria_config:whoami() =:= replicant of true -> - ok = send_delay_start(), - {ok, #{next_clientid => undefined}}; + ignore; false -> - ignore + ok = send_delay_start(), + {ok, #{next_clientid => undefined}} end. %% @doc Count the number of sessions. diff --git a/apps/emqx/test/emqx_cm_registry_keeper_SUITE.erl b/apps/emqx/test/emqx_cm_registry_keeper_SUITE.erl new file mode 100644 index 000000000..3dcded1c3 --- /dev/null +++ b/apps/emqx/test/emqx_cm_registry_keeper_SUITE.erl @@ -0,0 +1,100 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_cm_registry_keeper_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include("emqx_cm.hrl"). + +%%-------------------------------------------------------------------- +%% CT callbacks +%%-------------------------------------------------------------------- + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + AppConfig = "broker.session_history_retain = 2s", + Apps = emqx_cth_suite:start( + [{emqx, #{config => AppConfig}}], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + emqx_cth_suite:stop(proplists:get_value(apps, Config)). + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, Config) -> + Config. + +t_cleanup_after_retain(_) -> + Pid = spawn(fun() -> + receive + stop -> ok + end + end), + ClientId = <<"clientid">>, + ClientId2 = <<"clientid2">>, + emqx_cm_registry:register_channel({ClientId, Pid}), + emqx_cm_registry:register_channel({ClientId2, Pid}), + ?assertEqual([Pid], emqx_cm_registry:lookup_channels(ClientId)), + ?assertEqual([Pid], emqx_cm_registry:lookup_channels(ClientId2)), + ?assertEqual(2, emqx_cm_registry_keeper:count(0)), + T0 = erlang:system_time(seconds), + exit(Pid, kill), + %% lookup_channel chesk if the channel is still alive + ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)), + ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId2)), + %% simulate a DOWN message which causes emqx_cm to call clean_down + %% to clean the channels for real + ok = emqx_cm:clean_down({Pid, ClientId}), + ok = emqx_cm:clean_down({Pid, ClientId2}), + ?assertEqual(2, emqx_cm_registry_keeper:count(T0)), + ?assertEqual(2, emqx_cm_registry_keeper:count(0)), + ?retry(_Interval = 1000, _Attempts = 4, begin + ?assertEqual(0, emqx_cm_registry_keeper:count(T0)), + ?assertEqual(0, emqx_cm_registry_keeper:count(0)) + end), + ok. + +%% count is cached when the number of entries is greater than 1000 +t_count_cache(_) -> + Pid = self(), + ClientsCount = 999, + ClientIds = lists:map(fun erlang:integer_to_binary/1, lists:seq(1, ClientsCount)), + Channels = lists:map(fun(ClientId) -> {ClientId, Pid} end, ClientIds), + lists:foreach( + fun emqx_cm_registry:register_channel/1, + Channels + ), + T0 = erlang:system_time(seconds), + ?assertEqual(ClientsCount, emqx_cm_registry_keeper:count(0)), + ?assertEqual(ClientsCount, emqx_cm_registry_keeper:count(T0)), + %% insert another one to trigger the cache threshold + emqx_cm_registry:register_channel({<<"-1">>, Pid}), + ?assertEqual(ClientsCount + 1, emqx_cm_registry_keeper:count(0)), + ?assertEqual(ClientsCount, emqx_cm_registry_keeper:count(T0)), + mnesia:clear_table(?CHAN_REG_TAB), + ok. + +channel(Id, Pid) -> + #channel{chid = Id, pid = Pid}. From f0569d8ae831d97a1f5f3b80ff2a183e94d915f6 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 1 Feb 2024 20:54:03 +0100 Subject: [PATCH 9/9] refactor: use mria:async_dirty to group dirty ops --- apps/emqx/include/emqx_cm.hrl | 2 +- apps/emqx/src/emqx_cm_registry.erl | 52 ++++++++++++------- apps/emqx/src/emqx_cm_registry_keeper.erl | 6 +-- .../test/emqx_cm_registry_keeper_SUITE.erl | 2 +- .../src/emqx_mgmt_api_clients.erl | 4 +- rel/i18n/emqx_schema.hocon | 4 +- 6 files changed, 43 insertions(+), 27 deletions(-) diff --git a/apps/emqx/include/emqx_cm.hrl b/apps/emqx/include/emqx_cm.hrl index d1d195921..a84a06688 100644 --- a/apps/emqx/include/emqx_cm.hrl +++ b/apps/emqx/include/emqx_cm.hrl @@ -23,7 +23,7 @@ -define(CHAN_INFO_TAB, emqx_channel_info). -define(CHAN_LIVE_TAB, emqx_channel_live). -%% Mria table for session registraition. +%% Mria table for session registration. -define(CHAN_REG_TAB, emqx_channel_registry). -define(T_KICK, 5_000). diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index 4556bce0e..683afcb86 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -25,7 +25,9 @@ -export([ register_channel/1, - unregister_channel/1 + register_channel2/1, + unregister_channel/1, + unregister_channel2/1 ]). -export([lookup_channels/1]). @@ -80,14 +82,21 @@ is_hist_enabled() -> register_channel(ClientId) when is_binary(ClientId) -> register_channel({ClientId, self()}); register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> + IsHistEnabled = is_hist_enabled(), case is_enabled() of + true when IsHistEnabled -> + mria:async_dirty(?CM_SHARD, fun ?MODULE:register_channel2/1, [record(ClientId, ChanPid)]); true -> - ok = when_hist_enabled(fun() -> delete_hist_d(ClientId) end), mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid)); false -> ok end. +%% @private +register_channel2(#channel{chid = ClientId} = Record) -> + _ = delete_hist_d(ClientId), + mria:dirty_write(?CHAN_REG_TAB, Record). + %% @doc Unregister a global channel. -spec unregister_channel( emqx_types:clientid() @@ -96,15 +105,23 @@ register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) unregister_channel(ClientId) when is_binary(ClientId) -> unregister_channel({ClientId, self()}); unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> + IsHistEnabled = is_hist_enabled(), case is_enabled() of + true when IsHistEnabled -> + mria:async_dirty(?CM_SHARD, fun ?MODULE:unregister_channel2/1, [ + record(ClientId, ChanPid) + ]); true -> - mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid)), - %% insert unregistration history after unregstration - ok = when_hist_enabled(fun() -> insert_hist_d(ClientId) end); + mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid)); false -> ok end. +%% @private +unregister_channel2(#channel{chid = ClientId} = Record) -> + mria:dirty_delete_object(?CHAN_REG_TAB, Record), + ok = insert_hist_d(ClientId). + %% @doc Lookup the global channels. -spec lookup_channels(emqx_types:clientid()) -> list(pid()). lookup_channels(ClientId) -> @@ -205,24 +222,23 @@ do_cleanup_channels(Node) -> _Return = ['$_'] } ], - lists:foreach(fun delete_channel/1, mnesia:select(?CHAN_REG_TAB, Pat, write)). + IsHistEnabled = is_hist_enabled(), + lists:foreach( + fun(Chan) -> delete_channel(IsHistEnabled, Chan) end, + mnesia:select(?CHAN_REG_TAB, Pat, write) + ). -delete_channel(Chan) -> +delete_channel(IsHistEnabled, Chan) -> mnesia:delete_object(?CHAN_REG_TAB, Chan, write), - ok = when_hist_enabled(fun() -> insert_hist_t(Chan#channel.chid) end). + case IsHistEnabled of + true -> + insert_hist_t(Chan#channel.chid); + false -> + ok + end. %%-------------------------------------------------------------------- %% History entry operations -%%-------------------------------------------------------------------- - -when_hist_enabled(F) -> - case is_hist_enabled() of - true -> - _ = F(); - false -> - ok - end, - ok. %% Insert unregistration history in a transaction when unregistering the last channel for a clientid. insert_hist_t(ClientId) -> diff --git a/apps/emqx/src/emqx_cm_registry_keeper.erl b/apps/emqx/src/emqx_cm_registry_keeper.erl index 1087932df..e96fcdd7d 100644 --- a/apps/emqx/src/emqx_cm_registry_keeper.erl +++ b/apps/emqx/src/emqx_cm_registry_keeper.erl @@ -113,7 +113,7 @@ handle_info(start, #{next_clientid := NextClientId} = State) -> end, {noreply, State#{next_clientid := NewNext}}; false -> - %% if not enabled, dealy and check again + %% if not enabled, delay and check again %% because it might be enabled from online config change while waiting ok = send_delay_start(), {noreply, State} @@ -142,7 +142,7 @@ cleanup_loop('$end_of_table', _Count, _IsExpired) -> cleanup_loop(undefined, Count, IsExpired) -> cleanup_loop(mnesia:dirty_first(?CHAN_REG_TAB), Count, IsExpired); cleanup_loop(ClientId, Count, IsExpired) -> - Recods = mnesia:dirty_read(?CHAN_REG_TAB, ClientId), + Records = mnesia:dirty_read(?CHAN_REG_TAB, ClientId), Next = mnesia:dirty_next(?CHAN_REG_TAB, ClientId), lists:foreach( fun(R) -> @@ -153,7 +153,7 @@ cleanup_loop(ClientId, Count, IsExpired) -> ok end end, - Recods + Records ), cleanup_loop(Next, Count - 1, IsExpired). diff --git a/apps/emqx/test/emqx_cm_registry_keeper_SUITE.erl b/apps/emqx/test/emqx_cm_registry_keeper_SUITE.erl index 3dcded1c3..f3899fb3a 100644 --- a/apps/emqx/test/emqx_cm_registry_keeper_SUITE.erl +++ b/apps/emqx/test/emqx_cm_registry_keeper_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 935c690fe..8965f4633 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -401,7 +401,7 @@ schema("/sessions_count") -> required => false, default => 0, desc => - <<"Include sessions expired after this time (UNIX Epoch in seconds precesion)">>, + <<"Include sessions expired after this time (UNIX Epoch in seconds precision)">>, example => 1705391625 })} ], @@ -1087,6 +1087,6 @@ client_example() -> }. sessions_count(get, #{query_string := QString}) -> - Since = maps:get(<<"since">>, QString, undefined), + Since = maps:get(<<"since">>, QString, 0), Count = emqx_cm_registry_keeper:count(Since), {200, integer_to_binary(Count)}. diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 4c9f1b83e..a53104c0c 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1422,7 +1422,7 @@ force_shutdown_enable.label: broker_enable_session_registry.desc: """The Global Session Registry is a cluster-wide mechanism designed to maintain the uniqueness of client IDs within the cluster. Recommendations for Use
-- Default Setting: It is generally advisable to enable. This feature is crucial for session takeover to work properly. For example if a client reconneted to another node in the cluster, the new connection will need to find the old session and take it over. +- Default Setting: It is generally advisable to enable. This feature is crucial for session takeover to work properly. For example if a client reconnected to another node in the cluster, the new connection will need to find the old session and take it over. - Disabling the Feature: Disabling is an option for scenarios when all sessions expire immediately after client is disconnected (i.e. session expiry interval is zero). This can be relevant in certain specialized use cases. Advantages of Disabling
@@ -1433,7 +1433,7 @@ broker_session_history_retain.desc: """The duration to retain the session registration history. Setting this to a value greater than `0s` will increase memory usage and impact peformance. This retained history can be used to monitor how many sessions were registered in the past configured duration. Note: This config has no effect if `enable_session_registry` is set to `false`.
-Note: If the clients are suing random client IDs, it's not recommended to enable this feature, at least not for a long retain duration.
+Note: If the clients are using random client IDs, it's not recommended to enable this feature, at least not for a long retention period.
Note: When clustered, the lowest (but greater than `0s`) value among the nodes in the cluster will take effect.""" overload_protection_backoff_delay.desc: