diff --git a/apps/emqx/include/emqx_cm.hrl b/apps/emqx/include/emqx_cm.hrl
index 6478a6162..a84a06688 100644
--- a/apps/emqx/include/emqx_cm.hrl
+++ b/apps/emqx/include/emqx_cm.hrl
@@ -23,7 +23,7 @@
-define(CHAN_INFO_TAB, emqx_channel_info).
-define(CHAN_LIVE_TAB, emqx_channel_live).
-%% Mria/Mnesia Tables for channel management.
+%% Mria table for session registration.
-define(CHAN_REG_TAB, emqx_channel_registry).
-define(T_KICK, 5_000).
@@ -32,4 +32,11 @@
-define(CM_POOL, emqx_cm_pool).
+%% Registered sessions.
+-record(channel, {
+ chid :: emqx_types:clientid() | '_',
+ %% pid field is extended in 5.6.0 to support recording unregistration timestamp.
+ pid :: pid() | non_neg_integer() | '$1'
+}).
+
-endif.
diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl
index d9d5aa941..0cf015141 100644
--- a/apps/emqx/src/emqx_cm.erl
+++ b/apps/emqx/src/emqx_cm.erl
@@ -124,7 +124,8 @@
{?CHAN_TAB, 'channels.count', 'channels.max'},
{?CHAN_TAB, 'sessions.count', 'sessions.max'},
{?CHAN_CONN_TAB, 'connections.count', 'connections.max'},
- {?CHAN_LIVE_TAB, 'live_connections.count', 'live_connections.max'}
+ {?CHAN_LIVE_TAB, 'live_connections.count', 'live_connections.max'},
+ {?CHAN_REG_TAB, 'cluster_sessions.count', 'cluster_sessions.max'}
]).
%% Batch drain
diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl
index 058bb53ec..683afcb86 100644
--- a/apps/emqx/src/emqx_cm_registry.erl
+++ b/apps/emqx/src/emqx_cm_registry.erl
@@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
-%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2019-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -19,18 +19,15 @@
-behaviour(gen_server).
--include("emqx.hrl").
--include("emqx_cm.hrl").
--include("logger.hrl").
--include("types.hrl").
-
-export([start_link/0]).
--export([is_enabled/0]).
+-export([is_enabled/0, is_hist_enabled/0]).
-export([
register_channel/1,
- unregister_channel/1
+ register_channel2/1,
+ unregister_channel/1,
+ unregister_channel2/1
]).
-export([lookup_channels/1]).
@@ -50,10 +47,13 @@
do_cleanup_channels/1
]).
--define(REGISTRY, ?MODULE).
--define(LOCK, {?MODULE, cleanup_down}).
+-include("emqx.hrl").
+-include("emqx_cm.hrl").
+-include("logger.hrl").
+-include("types.hrl").
--record(channel, {chid, pid}).
+-define(REGISTRY, ?MODULE).
+-define(NODE_DOWN_CLEANUP_LOCK, {?MODULE, cleanup_down}).
%% @doc Start the global channel registry.
-spec start_link() -> startlink_ret().
@@ -69,6 +69,11 @@ start_link() ->
is_enabled() ->
emqx:get_config([broker, enable_session_registry]).
+%% @doc Is the global session registration history enabled?
+-spec is_hist_enabled() -> boolean().
+is_hist_enabled() ->
+ retain_duration() > 0.
+
%% @doc Register a global channel.
-spec register_channel(
emqx_types:clientid()
@@ -77,11 +82,21 @@ is_enabled() ->
register_channel(ClientId) when is_binary(ClientId) ->
register_channel({ClientId, self()});
register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
+ IsHistEnabled = is_hist_enabled(),
case is_enabled() of
- true -> mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid));
- false -> ok
+ true when IsHistEnabled ->
+ mria:async_dirty(?CM_SHARD, fun ?MODULE:register_channel2/1, [record(ClientId, ChanPid)]);
+ true ->
+ mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid));
+ false ->
+ ok
end.
+%% @private
+register_channel2(#channel{chid = ClientId} = Record) ->
+ _ = delete_hist_d(ClientId),
+ mria:dirty_write(?CHAN_REG_TAB, Record).
+
%% @doc Unregister a global channel.
-spec unregister_channel(
emqx_types:clientid()
@@ -90,19 +105,54 @@ register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid)
unregister_channel(ClientId) when is_binary(ClientId) ->
unregister_channel({ClientId, self()});
unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
+ IsHistEnabled = is_hist_enabled(),
case is_enabled() of
- true -> mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid));
- false -> ok
+ true when IsHistEnabled ->
+ mria:async_dirty(?CM_SHARD, fun ?MODULE:unregister_channel2/1, [
+ record(ClientId, ChanPid)
+ ]);
+ true ->
+ mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid));
+ false ->
+ ok
end.
+%% @private
+unregister_channel2(#channel{chid = ClientId} = Record) ->
+ mria:dirty_delete_object(?CHAN_REG_TAB, Record),
+ ok = insert_hist_d(ClientId).
+
%% @doc Lookup the global channels.
-spec lookup_channels(emqx_types:clientid()) -> list(pid()).
lookup_channels(ClientId) ->
- [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(?CHAN_REG_TAB, ClientId)].
+ lists:filtermap(
+ fun
+ (#channel{pid = ChanPid}) when is_pid(ChanPid) ->
+ case is_pid_down(ChanPid) of
+ true ->
+ false;
+ _ ->
+ {true, ChanPid}
+ end;
+ (_) ->
+ false
+ end,
+ mnesia:dirty_read(?CHAN_REG_TAB, ClientId)
+ ).
+
+%% Return 'true' or 'false' if it's a local pid.
+%% Otherwise return 'unknown'.
+is_pid_down(Pid) when node(Pid) =:= node() ->
+ not erlang:is_process_alive(Pid);
+is_pid_down(_) ->
+ unknown.
record(ClientId, ChanPid) ->
#channel{chid = ClientId, pid = ChanPid}.
+hist(ClientId) ->
+ #channel{chid = ClientId, pid = now_ts()}.
+
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
@@ -158,15 +208,95 @@ code_change(_OldVsn, State, _Extra) ->
cleanup_channels(Node) ->
global:trans(
- {?LOCK, self()},
+ {?NODE_DOWN_CLEANUP_LOCK, self()},
fun() ->
mria:transaction(?CM_SHARD, fun ?MODULE:do_cleanup_channels/1, [Node])
end
).
do_cleanup_channels(Node) ->
- Pat = [{#channel{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}],
- lists:foreach(fun delete_channel/1, mnesia:select(?CHAN_REG_TAB, Pat, write)).
+ Pat = [
+ {
+ #channel{pid = '$1', _ = '_'},
+ _Match = [{'andalso', {is_pid, '$1'}, {'==', {node, '$1'}, Node}}],
+ _Return = ['$_']
+ }
+ ],
+ IsHistEnabled = is_hist_enabled(),
+ lists:foreach(
+ fun(Chan) -> delete_channel(IsHistEnabled, Chan) end,
+ mnesia:select(?CHAN_REG_TAB, Pat, write)
+ ).
-delete_channel(Chan) ->
- mnesia:delete_object(?CHAN_REG_TAB, Chan, write).
+delete_channel(IsHistEnabled, Chan) ->
+ mnesia:delete_object(?CHAN_REG_TAB, Chan, write),
+ case IsHistEnabled of
+ true ->
+ insert_hist_t(Chan#channel.chid);
+ false ->
+ ok
+ end.
+
+%%--------------------------------------------------------------------
+%% History entry operations
+
+%% Insert unregistration history in a transaction when unregistering the last channel for a clientid.
+insert_hist_t(ClientId) ->
+ case delete_hist_t(ClientId) of
+ true ->
+ ok;
+ false ->
+ mnesia:write(?CHAN_REG_TAB, hist(ClientId), write)
+ end.
+
+%% Dirty insert unregistration history.
+%% Since dirty opts are used, async pool workers may race deletes and inserts,
+%% so there could be more than one history records for a clientid,
+%% but it should be eventually consistent after the client re-registers or the periodic cleanup.
+insert_hist_d(ClientId) ->
+ %% delete old hist records first
+ case delete_hist_d(ClientId) of
+ true ->
+ ok;
+ false ->
+ mria:dirty_write(?CHAN_REG_TAB, hist(ClientId))
+ end.
+
+%% Current timestamp in seconds.
+now_ts() ->
+ erlang:system_time(seconds).
+
+%% Delete all history records for a clientid, return true if there is a Pid found.
+delete_hist_t(ClientId) ->
+ fold_hist(
+ fun(Hist) -> mnesia:delete_object(?CHAN_REG_TAB, Hist, write) end,
+ mnesia:read(?CHAN_REG_TAB, ClientId, write)
+ ).
+
+%% Delete all history records for a clientid, return true if there is a Pid found.
+delete_hist_d(ClientId) ->
+ fold_hist(
+ fun(Hist) -> mria:dirty_delete_object(?CHAN_REG_TAB, Hist) end,
+ mnesia:dirty_read(?CHAN_REG_TAB, ClientId)
+ ).
+
+%% Fold over the history records, return true if there is a Pid found.
+fold_hist(F, List) ->
+ lists:foldl(
+ fun(#channel{pid = Ts} = Record, HasPid) ->
+ case is_integer(Ts) of
+ true ->
+ ok = F(Record),
+ HasPid;
+ false ->
+ true
+ end
+ end,
+ false,
+ List
+ ).
+
+%% Return the session registration history retain duration.
+-spec retain_duration() -> non_neg_integer().
+retain_duration() ->
+ emqx:get_config([broker, session_history_retain]).
diff --git a/apps/emqx/src/emqx_cm_registry_keeper.erl b/apps/emqx/src/emqx_cm_registry_keeper.erl
new file mode 100644
index 000000000..e96fcdd7d
--- /dev/null
+++ b/apps/emqx/src/emqx_cm_registry_keeper.erl
@@ -0,0 +1,194 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @doc This module implements the global session registry history cleaner.
+-module(emqx_cm_registry_keeper).
+-behaviour(gen_server).
+
+-export([
+ start_link/0,
+ count/1
+]).
+
+%% gen_server callbacks
+-export([
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3
+]).
+
+-include_lib("stdlib/include/ms_transform.hrl").
+-include("emqx_cm.hrl").
+
+-define(CACHE_COUNT_THRESHOLD, 1000).
+-define(MIN_COUNT_INTERVAL_SECONDS, 5).
+-define(CLEANUP_CHUNK_SIZE, 10000).
+
+-define(IS_HIST_ENABLED(RETAIN), (RETAIN > 0)).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init(_) ->
+ case mria_config:whoami() =:= replicant of
+ true ->
+ ignore;
+ false ->
+ ok = send_delay_start(),
+ {ok, #{next_clientid => undefined}}
+ end.
+
+%% @doc Count the number of sessions.
+%% Include sessions which are expired since the given timestamp if `since' is greater than 0.
+-spec count(non_neg_integer()) -> non_neg_integer().
+count(Since) ->
+ Retain = retain_duration(),
+ Now = now_ts(),
+ %% Get table size if hist is not enabled or
+ %% Since is before the earliest possible retention time.
+ IsCountAll = (not ?IS_HIST_ENABLED(Retain) orelse (Now - Retain >= Since)),
+ case IsCountAll of
+ true ->
+ mnesia:table_info(?CHAN_REG_TAB, size);
+ false ->
+ %% make a gen call to avoid many callers doing the same concurrently
+ gen_server:call(?MODULE, {count, Since}, infinity)
+ end.
+
+handle_call({count, Since}, _From, State) ->
+ {LastCountTime, LastCount} =
+ case State of
+ #{last_count_time := T, last_count := C} ->
+ {T, C};
+ _ ->
+ {0, 0}
+ end,
+ Now = now_ts(),
+ Total = mnesia:table_info(?CHAN_REG_TAB, size),
+ %% Always count if the table is small enough
+ %% or when the last count is too old
+ IsTableSmall = (Total < ?CACHE_COUNT_THRESHOLD),
+ IsLastCountOld = (Now - LastCountTime > ?MIN_COUNT_INTERVAL_SECONDS),
+ case IsTableSmall orelse IsLastCountOld of
+ true ->
+ Count = do_count(Since),
+ CountFinishedAt = now_ts(),
+ {reply, Count, State#{last_count_time => CountFinishedAt, last_count => Count}};
+ false ->
+ {reply, LastCount, State}
+ end;
+handle_call(_Request, _From, State) ->
+ {reply, ok, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info(start, #{next_clientid := NextClientId} = State) ->
+ case is_hist_enabled() of
+ true ->
+ NewNext =
+ case cleanup_one_chunk(NextClientId) of
+ '$end_of_table' ->
+ ok = send_delay_start(),
+ undefined;
+ Id ->
+ _ = erlang:garbage_collect(),
+ Id
+ end,
+ {noreply, State#{next_clientid := NewNext}};
+ false ->
+ %% if not enabled, delay and check again
+ %% because it might be enabled from online config change while waiting
+ ok = send_delay_start(),
+ {noreply, State}
+ end;
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+cleanup_one_chunk(NextClientId) ->
+ Retain = retain_duration(),
+ Now = now_ts(),
+ IsExpired = fun(#channel{pid = Ts}) ->
+ is_integer(Ts) andalso (Ts < Now - Retain)
+ end,
+ cleanup_loop(NextClientId, ?CLEANUP_CHUNK_SIZE, IsExpired).
+
+cleanup_loop(ClientId, 0, _IsExpired) ->
+ ClientId;
+cleanup_loop('$end_of_table', _Count, _IsExpired) ->
+ '$end_of_table';
+cleanup_loop(undefined, Count, IsExpired) ->
+ cleanup_loop(mnesia:dirty_first(?CHAN_REG_TAB), Count, IsExpired);
+cleanup_loop(ClientId, Count, IsExpired) ->
+ Records = mnesia:dirty_read(?CHAN_REG_TAB, ClientId),
+ Next = mnesia:dirty_next(?CHAN_REG_TAB, ClientId),
+ lists:foreach(
+ fun(R) ->
+ case IsExpired(R) of
+ true ->
+ mria:dirty_delete_object(?CHAN_REG_TAB, R);
+ false ->
+ ok
+ end
+ end,
+ Records
+ ),
+ cleanup_loop(Next, Count - 1, IsExpired).
+
+is_hist_enabled() ->
+ retain_duration() > 0.
+
+%% Return the session registration history retain duration in seconds.
+-spec retain_duration() -> non_neg_integer().
+retain_duration() ->
+ emqx:get_config([broker, session_history_retain]).
+
+cleanup_delay() ->
+ Default = timer:minutes(2),
+ case retain_duration() of
+ 0 ->
+ %% prepare for online config change
+ Default;
+ RetainSeconds ->
+ Min = max(timer:seconds(1), timer:seconds(RetainSeconds) div 4),
+ min(Min, Default)
+ end.
+
+send_delay_start() ->
+ Delay = cleanup_delay(),
+ ok = send_delay_start(Delay).
+
+send_delay_start(Delay) ->
+ _ = erlang:send_after(Delay, self(), start),
+ ok.
+
+now_ts() ->
+ erlang:system_time(seconds).
+
+do_count(Since) ->
+ Ms = ets:fun2ms(fun(#channel{pid = V}) ->
+ is_pid(V) orelse (is_integer(V) andalso (V >= Since))
+ end),
+ ets:select_count(?CHAN_REG_TAB, Ms).
diff --git a/apps/emqx/src/emqx_cm_sup.erl b/apps/emqx/src/emqx_cm_sup.erl
index 622921f1d..3306b7ccd 100644
--- a/apps/emqx/src/emqx_cm_sup.erl
+++ b/apps/emqx/src/emqx_cm_sup.erl
@@ -49,6 +49,7 @@ init([]) ->
Locker = child_spec(emqx_cm_locker, 5000, worker),
CmPool = emqx_pool_sup:spec(emqx_cm_pool_sup, [?CM_POOL, random, {emqx_pool, start_link, []}]),
Registry = child_spec(emqx_cm_registry, 5000, worker),
+ RegistryKeeper = child_spec(emqx_cm_registry_keeper, 5000, worker),
Manager = child_spec(emqx_cm, 5000, worker),
DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor),
Children =
@@ -58,6 +59,7 @@ init([]) ->
Locker,
CmPool,
Registry,
+ RegistryKeeper,
Manager,
DSSessionGCSup
],
diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl
index d9171f711..d5989687d 100644
--- a/apps/emqx/src/emqx_schema.erl
+++ b/apps/emqx/src/emqx_schema.erl
@@ -182,7 +182,7 @@
-define(DEFAULT_MULTIPLIER, 1.5).
-define(DEFAULT_BACKOFF, 0.75).
-namespace() -> broker.
+namespace() -> emqx.
tags() ->
[<<"EMQX">>].
@@ -230,7 +230,7 @@ roots(high) ->
);
roots(medium) ->
[
- {"broker",
+ {broker,
sc(
ref("broker"),
#{
@@ -1347,24 +1347,43 @@ fields("deflate_opts") ->
];
fields("broker") ->
[
- {"enable_session_registry",
+ {enable_session_registry,
sc(
boolean(),
#{
default => true,
+ importance => ?IMPORTANCE_HIGH,
desc => ?DESC(broker_enable_session_registry)
}
)},
- {"session_locking_strategy",
+ {session_history_retain,
+ sc(
+ duration_s(),
+ #{
+ default => <<"0s">>,
+ importance => ?IMPORTANCE_LOW,
+ desc => ?DESC("broker_session_history_retain")
+ }
+ )},
+ {session_locking_strategy,
sc(
hoconsc:enum([local, leader, quorum, all]),
#{
default => quorum,
+ importance => ?IMPORTANCE_HIDDEN,
desc => ?DESC(broker_session_locking_strategy)
}
)},
- shared_subscription_strategy(),
- {"shared_dispatch_ack_enabled",
+ %% moved to under mqtt root
+ {shared_subscription_strategy,
+ sc(
+ string(),
+ #{
+ deprecated => {since, "5.1.0"},
+ importance => ?IMPORTANCE_HIDDEN
+ }
+ )},
+ {shared_dispatch_ack_enabled,
sc(
boolean(),
#{
@@ -1374,7 +1393,7 @@ fields("broker") ->
desc => ?DESC(broker_shared_dispatch_ack_enabled)
}
)},
- {"route_batch_clean",
+ {route_batch_clean,
sc(
boolean(),
#{
@@ -1383,18 +1402,18 @@ fields("broker") ->
importance => ?IMPORTANCE_HIDDEN
}
)},
- {"perf",
+ {perf,
sc(
ref("broker_perf"),
#{importance => ?IMPORTANCE_HIDDEN}
)},
- {"routing",
+ {routing,
sc(
ref("broker_routing"),
#{importance => ?IMPORTANCE_HIDDEN}
)},
%% FIXME: Need new design for shared subscription group
- {"shared_subscription_group",
+ {shared_subscription_group,
sc(
map(name, ref("shared_subscription_group")),
#{
@@ -3640,7 +3659,22 @@ mqtt_general() ->
desc => ?DESC(mqtt_shared_subscription)
}
)},
- shared_subscription_strategy(),
+ {"shared_subscription_strategy",
+ sc(
+ hoconsc:enum([
+ random,
+ round_robin,
+ round_robin_per_group,
+ sticky,
+ local,
+ hash_topic,
+ hash_clientid
+ ]),
+ #{
+ default => round_robin,
+ desc => ?DESC(mqtt_shared_subscription_strategy)
+ }
+ )},
{"exclusive_subscription",
sc(
boolean(),
@@ -3846,24 +3880,6 @@ mqtt_session() ->
)}
].
-shared_subscription_strategy() ->
- {"shared_subscription_strategy",
- sc(
- hoconsc:enum([
- random,
- round_robin,
- round_robin_per_group,
- sticky,
- local,
- hash_topic,
- hash_clientid
- ]),
- #{
- default => round_robin,
- desc => ?DESC(broker_shared_subscription_strategy)
- }
- )}.
-
default_mem_check_interval() ->
case emqx_os_mon:is_os_check_supported() of
true -> <<"60s">>;
diff --git a/apps/emqx/src/emqx_stats.erl b/apps/emqx/src/emqx_stats.erl
index 4954d0a45..b919d472d 100644
--- a/apps/emqx/src/emqx_stats.erl
+++ b/apps/emqx/src/emqx_stats.erl
@@ -99,7 +99,11 @@
[
'sessions.count',
%% Maximum Number of Concurrent Sessions
- 'sessions.max'
+ 'sessions.max',
+ %% Count of Sessions in the cluster
+ 'cluster_sessions.count',
+ %% Maximum Number of Sessions in the cluster
+ 'cluster_sessions.max'
]
).
@@ -164,6 +168,8 @@ names() ->
emqx_connections_max,
emqx_live_connections_count,
emqx_live_connections_max,
+ emqx_cluster_sessions_count,
+ emqx_cluster_sessions_max,
emqx_sessions_count,
emqx_sessions_max,
emqx_channels_count,
diff --git a/apps/emqx/test/emqx_cm_registry_keeper_SUITE.erl b/apps/emqx/test/emqx_cm_registry_keeper_SUITE.erl
new file mode 100644
index 000000000..f3899fb3a
--- /dev/null
+++ b/apps/emqx/test/emqx_cm_registry_keeper_SUITE.erl
@@ -0,0 +1,100 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_cm_registry_keeper_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include("emqx_cm.hrl").
+
+%%--------------------------------------------------------------------
+%% CT callbacks
+%%--------------------------------------------------------------------
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+ AppConfig = "broker.session_history_retain = 2s",
+ Apps = emqx_cth_suite:start(
+ [{emqx, #{config => AppConfig}}],
+ #{work_dir => emqx_cth_suite:work_dir(Config)}
+ ),
+ [{apps, Apps} | Config].
+
+end_per_suite(Config) ->
+ emqx_cth_suite:stop(proplists:get_value(apps, Config)).
+
+init_per_testcase(_TestCase, Config) ->
+ Config.
+
+end_per_testcase(_TestCase, Config) ->
+ Config.
+
+t_cleanup_after_retain(_) ->
+ Pid = spawn(fun() ->
+ receive
+ stop -> ok
+ end
+ end),
+ ClientId = <<"clientid">>,
+ ClientId2 = <<"clientid2">>,
+ emqx_cm_registry:register_channel({ClientId, Pid}),
+ emqx_cm_registry:register_channel({ClientId2, Pid}),
+ ?assertEqual([Pid], emqx_cm_registry:lookup_channels(ClientId)),
+ ?assertEqual([Pid], emqx_cm_registry:lookup_channels(ClientId2)),
+ ?assertEqual(2, emqx_cm_registry_keeper:count(0)),
+ T0 = erlang:system_time(seconds),
+ exit(Pid, kill),
+ %% lookup_channel chesk if the channel is still alive
+ ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)),
+ ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId2)),
+ %% simulate a DOWN message which causes emqx_cm to call clean_down
+ %% to clean the channels for real
+ ok = emqx_cm:clean_down({Pid, ClientId}),
+ ok = emqx_cm:clean_down({Pid, ClientId2}),
+ ?assertEqual(2, emqx_cm_registry_keeper:count(T0)),
+ ?assertEqual(2, emqx_cm_registry_keeper:count(0)),
+ ?retry(_Interval = 1000, _Attempts = 4, begin
+ ?assertEqual(0, emqx_cm_registry_keeper:count(T0)),
+ ?assertEqual(0, emqx_cm_registry_keeper:count(0))
+ end),
+ ok.
+
+%% count is cached when the number of entries is greater than 1000
+t_count_cache(_) ->
+ Pid = self(),
+ ClientsCount = 999,
+ ClientIds = lists:map(fun erlang:integer_to_binary/1, lists:seq(1, ClientsCount)),
+ Channels = lists:map(fun(ClientId) -> {ClientId, Pid} end, ClientIds),
+ lists:foreach(
+ fun emqx_cm_registry:register_channel/1,
+ Channels
+ ),
+ T0 = erlang:system_time(seconds),
+ ?assertEqual(ClientsCount, emqx_cm_registry_keeper:count(0)),
+ ?assertEqual(ClientsCount, emqx_cm_registry_keeper:count(T0)),
+ %% insert another one to trigger the cache threshold
+ emqx_cm_registry:register_channel({<<"-1">>, Pid}),
+ ?assertEqual(ClientsCount + 1, emqx_cm_registry_keeper:count(0)),
+ ?assertEqual(ClientsCount, emqx_cm_registry_keeper:count(T0)),
+ mnesia:clear_table(?CHAN_REG_TAB),
+ ok.
+
+channel(Id, Pid) ->
+ #channel{chid = Id, pid = Pid}.
diff --git a/apps/emqx/test/emqx_config_SUITE.erl b/apps/emqx/test/emqx_config_SUITE.erl
index 4ddebd278..72611c3a6 100644
--- a/apps/emqx/test/emqx_config_SUITE.erl
+++ b/apps/emqx/test/emqx_config_SUITE.erl
@@ -76,8 +76,7 @@ t_fill_default_values(C) when is_list(C) ->
<<"trie_compaction">> := true
},
<<"route_batch_clean">> := false,
- <<"session_locking_strategy">> := <<"quorum">>,
- <<"shared_subscription_strategy">> := <<"round_robin">>
+ <<"session_history_retain">> := <<"0s">>
}
},
WithDefaults
diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl
index 4891b5293..c8f92de0d 100644
--- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl
+++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl
@@ -415,6 +415,7 @@ getstats(Key) ->
stats(connections) -> emqx_stats:getstat('connections.count');
stats(live_connections) -> emqx_stats:getstat('live_connections.count');
+stats(cluster_sessions) -> emqx_stats:getstat('cluster_sessions.count');
stats(topics) -> emqx_stats:getstat('topics.count');
stats(subscriptions) -> emqx_stats:getstat('subscriptions.count');
stats(received) -> emqx_metrics:val('messages.received');
diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl
index d7e3c094c..c36c6d0f3 100644
--- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl
+++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl
@@ -194,6 +194,12 @@ swagger_desc(live_connections) ->
"Connections at the time of sampling."
" Can only represent the approximate state"
>>;
+swagger_desc(cluster_sessions) ->
+ <<
+ "Total number of sessions in the cluster at the time of sampling. "
+ "It includes expired sessions when `broker.session_history_retain` is set to a duration greater than `0s`. "
+ "Can only represent the approximate state"
+ >>;
swagger_desc(received_msg_rate) ->
swagger_desc_format("Dropped messages ", per);
%swagger_desc(received_bytes_rate) -> swagger_desc_format("Received bytes ", per);
diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl
index 1995ec9da..a5cf0b584 100644
--- a/apps/emqx_management/src/emqx_mgmt.erl
+++ b/apps/emqx_management/src/emqx_mgmt.erl
@@ -145,6 +145,7 @@ node_info() ->
),
connections => ets:info(?CHAN_TAB, size),
live_connections => ets:info(?CHAN_LIVE_TAB, size),
+ cluster_sessions => ets:info(?CHAN_REG_TAB, size),
node_status => 'running',
uptime => proplists:get_value(uptime, BrokerInfo),
version => iolist_to_binary(proplists:get_value(version, BrokerInfo)),
diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl
index f394ffefa..8965f4633 100644
--- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl
+++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl
@@ -45,7 +45,8 @@
subscribe_batch/2,
unsubscribe/2,
unsubscribe_batch/2,
- set_keepalive/2
+ set_keepalive/2,
+ sessions_count/2
]).
-export([
@@ -96,7 +97,8 @@ paths() ->
"/clients/:clientid/subscribe/bulk",
"/clients/:clientid/unsubscribe",
"/clients/:clientid/unsubscribe/bulk",
- "/clients/:clientid/keepalive"
+ "/clients/:clientid/keepalive",
+ "/sessions_count"
].
schema("/clients") ->
@@ -385,6 +387,30 @@ schema("/clients/:clientid/keepalive") ->
)
}
}
+ };
+schema("/sessions_count") ->
+ #{
+ 'operationId' => sessions_count,
+ get => #{
+ description => ?DESC(get_sessions_count),
+ tags => ?TAGS,
+ parameters => [
+ {since,
+ hoconsc:mk(non_neg_integer(), #{
+ in => query,
+ required => false,
+ default => 0,
+ desc =>
+ <<"Include sessions expired after this time (UNIX Epoch in seconds precision)">>,
+ example => 1705391625
+ })}
+ ],
+ responses => #{
+ 200 => hoconsc:mk(binary(), #{
+ desc => <<"Number of sessions">>
+ })
+ }
+ }
}.
fields(clients) ->
@@ -1059,3 +1085,8 @@ client_example() ->
<<"recv_cnt">> => 4,
<<"recv_msg.qos0">> => 0
}.
+
+sessions_count(get, #{query_string := QString}) ->
+ Since = maps:get(<<"since">>, QString, 0),
+ Count = emqx_cm_registry_keeper:count(Since),
+ {200, integer_to_binary(Count)}.
diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl
index ca1a9fc0b..8db1ef720 100644
--- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl
+++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl
@@ -53,7 +53,8 @@
<<"alarm">>,
<<"sys_topics">>,
<<"sysmon">>,
- <<"log">>
+ <<"log">>,
+ <<"broker">>
| ?ROOT_KEYS_EE
]).
diff --git a/apps/emqx_management/src/emqx_mgmt_api_nodes.erl b/apps/emqx_management/src/emqx_mgmt_api_nodes.erl
index 9afb74f38..07d775f6e 100644
--- a/apps/emqx_management/src/emqx_mgmt_api_nodes.erl
+++ b/apps/emqx_management/src/emqx_mgmt_api_nodes.erl
@@ -160,6 +160,19 @@ fields(node_info) ->
non_neg_integer(),
#{desc => <<"Number of clients currently connected to this node">>, example => 0}
)},
+ {cluster_sessions,
+ mk(
+ non_neg_integer(),
+ #{
+ desc =>
+ <<
+ "By default, it includes only those sessions that have not expired. "
+ "If the `broker.session_history_retain` config is set to a duration greater than `0s`, "
+ "this count will also include sessions that expired within the specified retain time"
+ >>,
+ example => 0
+ }
+ )},
{load1,
mk(
float(),
diff --git a/apps/emqx_management/src/emqx_mgmt_api_stats.erl b/apps/emqx_management/src/emqx_mgmt_api_stats.erl
index b57565671..cddc2a7c3 100644
--- a/apps/emqx_management/src/emqx_mgmt_api_stats.erl
+++ b/apps/emqx_management/src/emqx_mgmt_api_stats.erl
@@ -89,6 +89,10 @@ fields(node_stats_data) ->
stats_schema('delayed.max', <<"Historical maximum number of delayed messages">>),
stats_schema('live_connections.count', <<"Number of current live connections">>),
stats_schema('live_connections.max', <<"Historical maximum number of live connections">>),
+ stats_schema('cluster_sessions.count', <<"Number of sessions in the cluster">>),
+ stats_schema(
+ 'cluster_sessions.max', <<"Historical maximum number of sessions in the cluster">>
+ ),
stats_schema('retained.count', <<"Number of currently retained messages">>),
stats_schema('retained.max', <<"Historical maximum number of retained messages">>),
stats_schema('sessions.count', <<"Number of current sessions">>),
diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl
index 59241bd02..2942ac485 100644
--- a/apps/emqx_prometheus/src/emqx_prometheus.erl
+++ b/apps/emqx_prometheus/src/emqx_prometheus.erl
@@ -251,7 +251,7 @@ add_collect_family(Name, Data, Callback, Type) ->
%% behaviour
fetch_from_local_node(Mode) ->
- {node(self()), #{
+ {node(), #{
stats_data => stats_data(Mode),
vm_data => vm_data(Mode),
cluster_data => cluster_data(Mode),
@@ -308,6 +308,8 @@ emqx_collect(K = emqx_sessions_count, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_sessions_max, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_channels_count, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_channels_max, D) -> gauge_metrics(?MG(K, D));
+emqx_collect(K = emqx_cluster_sessions_count, D) -> gauge_metrics(?MG(K, D));
+emqx_collect(K = emqx_cluster_sessions_max, D) -> gauge_metrics(?MG(K, D));
%% pub/sub stats
emqx_collect(K = emqx_topics_count, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_topics_max, D) -> gauge_metrics(?MG(K, D));
@@ -500,6 +502,8 @@ stats_metric_meta() ->
{emqx_sessions_max, gauge, 'sessions.max'},
{emqx_channels_count, gauge, 'channels.count'},
{emqx_channels_max, gauge, 'channels.max'},
+ {emqx_cluster_sessions_count, gauge, 'cluster_sessions.count'},
+ {emqx_cluster_sessions_max, gauge, 'cluster_sessions.max'},
%% pub/sub stats
{emqx_suboptions_count, gauge, 'suboptions.count'},
{emqx_suboptions_max, gauge, 'suboptions.max'},
diff --git a/apps/emqx_telemetry/src/emqx_telemetry.app.src b/apps/emqx_telemetry/src/emqx_telemetry.app.src
index 32c2baa91..b4b0ebcfe 100644
--- a/apps/emqx_telemetry/src/emqx_telemetry.app.src
+++ b/apps/emqx_telemetry/src/emqx_telemetry.app.src
@@ -1,6 +1,6 @@
{application, emqx_telemetry, [
{description, "Report telemetry data for EMQX Opensource edition"},
- {vsn, "0.1.3"},
+ {vsn, "0.2.0"},
{registered, [emqx_telemetry_sup, emqx_telemetry]},
{mod, {emqx_telemetry_app, []}},
{applications, [
diff --git a/apps/emqx_telemetry/src/emqx_telemetry.erl b/apps/emqx_telemetry/src/emqx_telemetry.erl
index 8842d7a86..2e54549b8 100644
--- a/apps/emqx_telemetry/src/emqx_telemetry.erl
+++ b/apps/emqx_telemetry/src/emqx_telemetry.erl
@@ -303,6 +303,9 @@ active_plugins() ->
num_clients() ->
emqx_stats:getstat('live_connections.count').
+num_cluster_sessions() ->
+ emqx_stats:getstat('cluster_sessions.count').
+
messages_sent() ->
emqx_metrics:val('messages.sent').
@@ -348,6 +351,7 @@ get_telemetry(State0 = #state{node_uuid = NodeUUID, cluster_uuid = ClusterUUID})
{nodes_uuid, nodes_uuid()},
{active_plugins, active_plugins()},
{num_clients, num_clients()},
+ {num_cluster_sessions, num_cluster_sessions()},
{messages_received, messages_received()},
{messages_sent, messages_sent()},
{build_info, build_info()},
diff --git a/changes/ce/feat-12326.en.md b/changes/ce/feat-12326.en.md
new file mode 100644
index 000000000..bfef51eb8
--- /dev/null
+++ b/changes/ce/feat-12326.en.md
@@ -0,0 +1,14 @@
+Add session registration history.
+
+Setting config `broker.session_history_retain` allows EMQX to keep track of expired sessions for the retained period.
+
+API `GET /api/v5/sessions_count?since=1705682238` can be called to count the cluster-wide sessions which were alive (unexpired) since the provided timestamp (UNIX epoch at seconds precision).
+
+A new gauge `cluster_sessions` is added to the metrics collection. Exposed to prometheus as
+
+```
+# TYPE emqx_cluster_sessions_count gauge
+emqx_cluster_sessions_count 1234
+```
+
+The counter can only be used for an approximate estimation as the collection and calculations are async.
diff --git a/rel/i18n/emqx_mgmt_api_clients.hocon b/rel/i18n/emqx_mgmt_api_clients.hocon
index 64d4e5279..2431c09ec 100644
--- a/rel/i18n/emqx_mgmt_api_clients.hocon
+++ b/rel/i18n/emqx_mgmt_api_clients.hocon
@@ -60,4 +60,14 @@ set_keepalive_seconds.desc:
set_keepalive_seconds.label:
"""Set the online client keepalive by seconds"""
+get_sessions_count.desc:
+"""Get the total number of sessions in the cluster.
+By default, it includes only those sessions that have not expired.
+If the `broker.session_history_retain` config is set to a duration greater than 0s,
+this count will also include sessions that expired within the specified retain time.
+By specifying the `since` parameter, it can return the number of sessions that have expired within the specified time."""
+
+get_sessions_count.label:
+"""Count number of sessions"""
+
}
diff --git a/rel/i18n/emqx_plugins_schema.hocon b/rel/i18n/emqx_plugins_schema.hocon
index b72c87054..af1fb4a0b 100644
--- a/rel/i18n/emqx_plugins_schema.hocon
+++ b/rel/i18n/emqx_plugins_schema.hocon
@@ -22,9 +22,9 @@ install_dir.label:
"""Install Directory"""
name_vsn.desc:
-"""The {name}-{version} of the plugin.
-It should match the plugin application name-version as the for the plugin release package name
-For example: my_plugin-0.1.0."""
+"""The `{name}-{version}` of the plugin.
+It should match the plugin application name-version as plugin release package name
+For example: `my_plugin-0.1.0`."""
name_vsn.label:
"""Name-Version"""
diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon
index 40da8d75e..a53104c0c 100644
--- a/rel/i18n/emqx_schema.hocon
+++ b/rel/i18n/emqx_schema.hocon
@@ -1022,7 +1022,7 @@ fields_ws_opts_supported_subprotocols.desc:
fields_ws_opts_supported_subprotocols.label:
"""Supported subprotocols"""
-broker_shared_subscription_strategy.desc:
+mqtt_shared_subscription_strategy.desc:
"""Dispatch strategy for shared subscription.
- `random`: Randomly select a subscriber for dispatch;
- `round_robin`: Messages from a single publisher are dispatched to subscribers in turn;
@@ -1420,7 +1420,21 @@ force_shutdown_enable.label:
"""Enable `force_shutdown` feature"""
broker_enable_session_registry.desc:
-"""Enable session registry"""
+"""The Global Session Registry is a cluster-wide mechanism designed to maintain the uniqueness of client IDs within the cluster.
+Recommendations for Use
+- Default Setting: It is generally advisable to enable. This feature is crucial for session takeover to work properly. For example if a client reconnected to another node in the cluster, the new connection will need to find the old session and take it over.
+- Disabling the Feature: Disabling is an option for scenarios when all sessions expire immediately after client is disconnected (i.e. session expiry interval is zero). This can be relevant in certain specialized use cases.
+
+Advantages of Disabling
+- Reduced Memory Usage: Turning off the session registry can lower the overall memory footprint of the system.
+- Improved Performance: Without the overhead of maintaining a global registry, the node can process client connections faster."""
+
+broker_session_history_retain.desc:
+"""The duration to retain the session registration history. Setting this to a value greater than `0s` will increase memory usage and impact peformance.
+This retained history can be used to monitor how many sessions were registered in the past configured duration.
+Note: This config has no effect if `enable_session_registry` is set to `false`.
+Note: If the clients are using random client IDs, it's not recommended to enable this feature, at least not for a long retention period.
+Note: When clustered, the lowest (but greater than `0s`) value among the nodes in the cluster will take effect."""
overload_protection_backoff_delay.desc:
"""The maximum duration of delay for background task execution during high load conditions."""