diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl index e664a7dd7..adbba032a 100644 --- a/apps/emqx/src/emqx_config_handler.erl +++ b/apps/emqx/src/emqx_config_handler.erl @@ -447,11 +447,17 @@ merge_to_override_config(RawConf, Opts) -> up_req({remove, _Opts}) -> '$remove'; up_req({{update, Req}, _Opts}) -> Req. -return_change_result(ConfKeyPath, {{update, _Req}, Opts}) -> - #{ - config => emqx_config:get(ConfKeyPath), - raw_config => return_rawconf(ConfKeyPath, Opts) - }; +return_change_result(ConfKeyPath, {{update, Req}, Opts}) -> + case Req =/= emqx_schema:tombstone() of + true -> + #{ + config => emqx_config:get(ConfKeyPath), + raw_config => return_rawconf(ConfKeyPath, Opts) + }; + false -> + %% like remove, nothing to return + #{} + end; return_change_result(_ConfKeyPath, {remove, _Opts}) -> #{}. diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 18ddcaba2..e00c79b60 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -22,7 +22,9 @@ -include("emqx_mqtt.hrl"). -include("logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). - +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. %% APIs -export([ list_raw/0, @@ -33,7 +35,8 @@ is_running/1, current_conns/2, max_conns/2, - id_example/0 + id_example/0, + default_max_conn/0 ]). -export([ @@ -61,8 +64,12 @@ -export([certs_dir/2]). -endif. +-type listener_id() :: atom() | binary(). + -define(CONF_KEY_PATH, [listeners, '?', '?']). -define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]). +-define(MARK_DEL, marked_for_deletion). +-define(MARK_DEL_BIN, <<"marked_for_deletion">>). -spec id_example() -> atom(). id_example() -> 'tcp:default'. @@ -105,19 +112,22 @@ do_list_raw() -> format_raw_listeners({Type0, Conf}) -> Type = binary_to_atom(Type0), - lists:map( - fun({LName, LConf0}) when is_map(LConf0) -> - Bind = parse_bind(LConf0), - Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}), - LConf1 = maps:remove(<<"authentication">>, LConf0), - LConf3 = maps:put(<<"running">>, Running, LConf1), - CurrConn = - case Running of - true -> current_conns(Type, LName, Bind); - false -> 0 - end, - LConf4 = maps:put(<<"current_connections">>, CurrConn, LConf3), - {Type0, LName, LConf4} + lists:filtermap( + fun + ({LName, LConf0}) when is_map(LConf0) -> + Bind = parse_bind(LConf0), + Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}), + LConf1 = maps:remove(<<"authentication">>, LConf0), + LConf3 = maps:put(<<"running">>, Running, LConf1), + CurrConn = + case Running of + true -> current_conns(Type, LName, Bind); + false -> 0 + end, + LConf4 = maps:put(<<"current_connections">>, CurrConn, LConf3), + {true, {Type0, LName, LConf4}}; + ({_LName, _MarkDel}) -> + false end, maps:to_list(Conf) ). @@ -195,7 +205,7 @@ start() -> ok = emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE), foreach_listeners(fun start_listener/3). --spec start_listener(atom()) -> ok | {error, term()}. +-spec start_listener(listener_id()) -> ok | {error, term()}. start_listener(ListenerId) -> apply_on_listener(ListenerId, fun start_listener/3). @@ -246,7 +256,7 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) -> restart() -> foreach_listeners(fun restart_listener/3). --spec restart_listener(atom()) -> ok | {error, term()}. +-spec restart_listener(listener_id()) -> ok | {error, term()}. restart_listener(ListenerId) -> apply_on_listener(ListenerId, fun restart_listener/3). @@ -271,7 +281,7 @@ stop() -> _ = emqx_config_handler:remove_handler(?CONF_KEY_PATH), foreach_listeners(fun stop_listener/3). --spec stop_listener(atom()) -> ok | {error, term()}. +-spec stop_listener(listener_id()) -> ok | {error, term()}. stop_listener(ListenerId) -> apply_on_listener(ListenerId, fun stop_listener/3). @@ -419,7 +429,9 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> end. %% Update the listeners at runtime -pre_config_update([listeners, Type, Name], {create, NewConf}, undefined) -> +pre_config_update([listeners, Type, Name], {create, NewConf}, V) when + V =:= undefined orelse V =:= ?MARK_DEL_BIN +-> CertsDir = certs_dir(Type, Name), {ok, convert_certs(CertsDir, NewConf)}; pre_config_update([listeners, _Type, _Name], {create, _NewConf}, _RawConf) -> @@ -434,6 +446,8 @@ pre_config_update([listeners, Type, Name], {update, Request}, RawConf) -> pre_config_update([listeners, _Type, _Name], {action, _Action, Updated}, RawConf) -> NewConf = emqx_utils_maps:deep_merge(RawConf, Updated), {ok, NewConf}; +pre_config_update([listeners, _Type, _Name], ?MARK_DEL, _RawConf) -> + {ok, ?MARK_DEL}; pre_config_update(_Path, _Request, RawConf) -> {ok, RawConf}. @@ -446,9 +460,9 @@ post_config_update([listeners, Type, Name], {update, _Request}, NewConf, OldConf #{enabled := true} -> restart_listener(Type, Name, {OldConf, NewConf}); _ -> ok end; -post_config_update([listeners, _Type, _Name], '$remove', undefined, undefined, _AppEnvs) -> - ok; -post_config_update([listeners, Type, Name], '$remove', undefined, OldConf, _AppEnvs) -> +post_config_update([listeners, Type, Name], Op, _, OldConf, _AppEnvs) when + Op =:= ?MARK_DEL andalso is_map(OldConf) +-> ok = unregister_ocsp_stapling_refresh(Type, Name), case stop_listener(Type, Name, OldConf) of ok -> @@ -611,6 +625,7 @@ format_bind(Bin) when is_binary(Bin) -> listener_id(Type, ListenerName) -> list_to_atom(lists:append([str(Type), ":", str(ListenerName)])). +-spec parse_listener_id(listener_id()) -> {ok, #{type => atom(), name => atom()}} | {error, term()}. parse_listener_id(Id) -> case string:split(str(Id), ":", leading) of [Type, Name] -> @@ -836,3 +851,15 @@ unregister_ocsp_stapling_refresh(Type, Name) -> ListenerId = listener_id(Type, Name), emqx_ocsp_cache:unregister_listener(ListenerId), ok. + +%% There is currently an issue with frontend +%% infinity is not a good value for it, so we use 5m for now +default_max_conn() -> + %% TODO: <<"infinity">> + 5_000_000. + +-ifdef(TEST). +%% since it's a copy-paste. we need to ensure it's the same atom. +ensure_same_atom_test() -> + ?assertEqual(?MARK_DEL, emqx_schema:tombstone()). +-endif. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 418a2db56..e36da0e0a 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -100,6 +100,13 @@ convert_servers/2 ]). +%% tombstone types +-export([ + tombstone/0, + tombstone_map/2, + get_tombstone_map_value_type/1 +]). + -behaviour(hocon_schema). -reflect_type([ @@ -777,45 +784,48 @@ fields("listeners") -> [ {"tcp", sc( - map(name, ref("mqtt_tcp_listener")), + tombstone_map(name, ref("mqtt_tcp_listener")), #{ desc => ?DESC(fields_listeners_tcp), - default => default_listener(tcp), + converter => fun(X, _) -> + ensure_default_listener(X, tcp) + end, required => {false, recursively} } )}, {"ssl", sc( - map(name, ref("mqtt_ssl_listener")), + tombstone_map(name, ref("mqtt_ssl_listener")), #{ desc => ?DESC(fields_listeners_ssl), - default => default_listener(ssl), + converter => fun(X, _) -> ensure_default_listener(X, ssl) end, required => {false, recursively} } )}, {"ws", sc( - map(name, ref("mqtt_ws_listener")), + tombstone_map(name, ref("mqtt_ws_listener")), #{ desc => ?DESC(fields_listeners_ws), - default => default_listener(ws), + converter => fun(X, _) -> ensure_default_listener(X, ws) end, required => {false, recursively} } )}, {"wss", sc( - map(name, ref("mqtt_wss_listener")), + tombstone_map(name, ref("mqtt_wss_listener")), #{ desc => ?DESC(fields_listeners_wss), - default => default_listener(wss), + converter => fun(X, _) -> ensure_default_listener(X, wss) end, required => {false, recursively} } )}, {"quic", sc( - map(name, ref("mqtt_quic_listener")), + tombstone_map(name, ref("mqtt_quic_listener")), #{ desc => ?DESC(fields_listeners_quic), + converter => fun keep_default_tombstone/2, required => {false, recursively} } )} @@ -1943,7 +1953,7 @@ base_listener(Bind) -> sc( hoconsc:union([infinity, pos_integer()]), #{ - default => <<"infinity">>, + default => emqx_listeners:default_max_conn(), desc => ?DESC(base_listener_max_connections) } )}, @@ -3092,20 +3102,12 @@ assert_required_field(Conf, Key, ErrorMessage) -> default_listener(tcp) -> #{ - <<"default">> => - #{ - <<"bind">> => <<"0.0.0.0:1883">>, - <<"max_connections">> => 1024000 - } + <<"bind">> => <<"0.0.0.0:1883">> }; default_listener(ws) -> #{ - <<"default">> => - #{ - <<"bind">> => <<"0.0.0.0:8083">>, - <<"max_connections">> => 1024000, - <<"websocket">> => #{<<"mqtt_path">> => <<"/mqtt">>} - } + <<"bind">> => <<"0.0.0.0:8083">>, + <<"websocket">> => #{<<"mqtt_path">> => <<"/mqtt">>} }; default_listener(SSLListener) -> %% The env variable is resolved in emqx_tls_lib by calling naive_env_interpolate @@ -3120,22 +3122,14 @@ default_listener(SSLListener) -> case SSLListener of ssl -> #{ - <<"default">> => - #{ - <<"bind">> => <<"0.0.0.0:8883">>, - <<"max_connections">> => 512000, - <<"ssl_options">> => SslOptions - } + <<"bind">> => <<"0.0.0.0:8883">>, + <<"ssl_options">> => SslOptions }; wss -> #{ - <<"default">> => - #{ - <<"bind">> => <<"0.0.0.0:8084">>, - <<"max_connections">> => 512000, - <<"ssl_options">> => SslOptions, - <<"websocket">> => #{<<"mqtt_path">> => <<"/mqtt">>} - } + <<"bind">> => <<"0.0.0.0:8084">>, + <<"ssl_options">> => SslOptions, + <<"websocket">> => #{<<"mqtt_path">> => <<"/mqtt">>} } end. @@ -3196,3 +3190,47 @@ special_env(_Name) -> -else. special_env(_Name) -> error. -endif. + +%% The tombstone atom. +tombstone() -> + marked_for_deletion. + +%% Make a map type, the value of which is allowed to be 'marked_for_deletion' +%% 'marked_for_delition' is a special value which means the key is deleted. +%% This is used to support the 'delete' operation in configs, +%% since deleting the key would result in default value being used. +tombstone_map(Name, Type) -> + %% marked_for_deletion must be the last member of the union + %% because we need to first union member to populate the default values + map(Name, ?UNION([Type, tombstone()])). + +%% inverse of mark_del_map +get_tombstone_map_value_type(Schema) -> + %% TODO: violation of abstraction, expose an API in hoconsc + %% hoconsc:map_value_type(Schema) + ?MAP(_Name, Union) = hocon_schema:field_schema(Schema, type), + %% TODO: violation of abstraction, fix hoconsc:union_members/1 + ?UNION(Members) = Union, + Tombstone = tombstone(), + [Type, Tombstone] = hoconsc:union_members(Members), + Type. + +%% Keep the 'default' tombstone, but delete others. +keep_default_tombstone(Map, _Opts) when is_map(Map) -> + maps:filter( + fun(Key, Value) -> + Key =:= <<"default">> orelse Value =/= atom_to_binary(tombstone()) + end, + Map + ); +keep_default_tombstone(Value, _Opts) -> + Value. + +ensure_default_listener(undefined, ListenerType) -> + %% let the schema's default value do its job + #{<<"default">> => default_listener(ListenerType)}; +ensure_default_listener(#{<<"default">> := _} = Map, _ListenerType) -> + keep_default_tombstone(Map, #{}); +ensure_default_listener(Map, ListenerType) -> + NewMap = Map#{<<"default">> => default_listener(ListenerType)}, + keep_default_tombstone(NewMap, #{}). diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index 1ecda913d..1b37f652b 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -24,6 +24,7 @@ -export([get_by_node/2, get_by_node/3]). -export([update/3, update/4]). -export([remove/2, remove/3]). +-export([tombstone/2]). -export([reset/2, reset/3]). -export([dump_schema/1, dump_schema/3]). -export([schema_module/0]). @@ -107,6 +108,10 @@ update(Node, KeyPath, UpdateReq, Opts0) when Node =:= node() -> update(Node, KeyPath, UpdateReq, Opts) -> emqx_conf_proto_v2:update(Node, KeyPath, UpdateReq, Opts). +%% @doc Mark the specified key path as tombstone +tombstone(KeyPath, Opts) -> + update(KeyPath, emqx_schema:tombstone(), Opts). + %% @doc remove all value of key path in cluster-override.conf or local-override.conf. -spec remove(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index b2ad69997..627ef0719 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -237,8 +237,9 @@ parse_spec_ref(Module, Path, Options) -> erlang:apply(Module, schema, [Path]) %% better error message catch - error:Reason -> - throw({error, #{mfa => {Module, schema, [Path]}, reason => Reason}}) + error:Reason:Stacktrace -> + MoreInfo = #{module => Module, path => Path, reason => Reason}, + erlang:raise(error, MoreInfo, Stacktrace) end, {Specs, Refs} = maps:fold( fun(Method, Meta, {Acc, RefsAcc}) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index d7f3ff321..9a338b33f 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -293,12 +293,14 @@ listeners_type() -> listeners_info(Opts) -> Listeners = hocon_schema:fields(emqx_schema, "listeners"), lists:map( - fun({Type, #{type := ?MAP(_Name, ?R_REF(Mod, Field))}}) -> - Fields0 = hocon_schema:fields(Mod, Field), + fun({ListenerType, Schema}) -> + Type = emqx_schema:get_tombstone_map_value_type(Schema), + ?R_REF(Mod, StructName) = Type, + Fields0 = hocon_schema:fields(Mod, StructName), Fields1 = lists:keydelete("authentication", 1, Fields0), Fields3 = required_bind(Fields1, Opts), - Ref = listeners_ref(Type, Opts), - TypeAtom = list_to_existing_atom(Type), + Ref = listeners_ref(ListenerType, Opts), + TypeAtom = list_to_existing_atom(ListenerType), #{ ref => ?R_REF(Ref), schema => [ @@ -642,7 +644,7 @@ create(Path, Conf) -> wrap(emqx_conf:update(Path, {create, Conf}, ?OPTS(cluster))). ensure_remove(Path) -> - wrap(emqx_conf:remove(Path, ?OPTS(cluster))). + wrap(emqx_conf:update(Path, emqx_schema:tombstone(), ?OPTS(cluster))). wrap({error, {post_config_update, emqx_listeners, Reason}}) -> {error, Reason}; wrap({error, {pre_config_update, emqx_listeners, Reason}}) -> {error, Reason}; diff --git a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl index cb4e370d3..3a26c948e 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl @@ -51,13 +51,13 @@ init_per_group(with_defaults_in_file, Config) -> %% if there is no config file, the such deletion would result in a deletion %% of the default listener. Name = atom_to_list(?MODULE) ++ "-default-listeners", - TmpConfFullPath = inject_tmp_config_content(Name, default_listeners_hcon_text()), + TmpConfFullPath = inject_tmp_config_content(Name, default_listeners_hocon_text()), emqx_mgmt_api_test_util:init_suite([emqx_conf]), [{injected_conf_file, TmpConfFullPath} | Config]. end_per_group(Group, Config) -> - emqx_conf:remove([listeners, tcp, new], #{override_to => cluster}), - emqx_conf:remove([listeners, tcp, new1], #{override_to => local}), + emqx_conf:tombstone([listeners, tcp, new], #{override_to => cluster}), + emqx_conf:tombstone([listeners, tcp, new1], #{override_to => local}), case Group =:= with_defaults_in_file of true -> {_, File} = lists:keyfind(injected_conf_file, 1, Config), @@ -94,16 +94,16 @@ t_max_connection_default({init, Config}) -> t_max_connection_default({'end', Config}) -> ok = file:delete(proplists:get_value(tmp_config_file, Config)); t_max_connection_default(Config) when is_list(Config) -> - %% Check infinity is binary not atom. #{<<"listeners">> := Listeners} = emqx_mgmt_api_listeners:do_list_listeners(), Target = lists:filter( fun(#{<<"id">> := Id}) -> Id =:= 'tcp:max_connection_test' end, Listeners ), - ?assertMatch([#{<<"max_connections">> := <<"infinity">>}], Target), + DefaultMaxConn = emqx_listeners:default_max_conn(), + ?assertMatch([#{<<"max_connections">> := DefaultMaxConn}], Target), NewPath = emqx_mgmt_api_test_util:api_path(["listeners", "tcp:max_connection_test"]), - ?assertMatch(#{<<"max_connections">> := <<"infinity">>}, request(get, NewPath, [], [])), - emqx_conf:remove([listeners, tcp, max_connection_test], #{override_to => cluster}), + ?assertMatch(#{<<"max_connections">> := DefaultMaxConn}, request(get, NewPath, [], [])), + emqx_conf:tombstone([listeners, tcp, max_connection_test], #{override_to => cluster}), ok. t_list_listeners(Config) when is_list(Config) -> @@ -114,7 +114,7 @@ t_list_listeners(Config) when is_list(Config) -> %% POST /listeners ListenerId = <<"tcp:default">>, - NewListenerId = <<"tcp:new">>, + NewListenerId = <<"tcp:new11">>, OriginPath = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]), NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]), @@ -128,7 +128,7 @@ t_list_listeners(Config) when is_list(Config) -> OriginListener2 = maps:remove(<<"id">>, OriginListener), Port = integer_to_binary(?PORT), NewConf = OriginListener2#{ - <<"name">> => <<"new">>, + <<"name">> => <<"new11">>, <<"bind">> => <<"0.0.0.0:", Port/binary>>, <<"max_connections">> := <<"infinity">> }, @@ -298,8 +298,6 @@ crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type, Port OriginPath = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]), NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]), OriginListener = request(get, OriginPath, [], []), - ct:pal("raw conf: ~p~n", [emqx_config:get_raw([listeners])]), - ct:pal("OriginListener:~p", [OriginListener]), %% create with full options ?assertEqual({error, not_found}, is_running(NewListenerId)), @@ -314,7 +312,7 @@ crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type, Port ?assertEqual(lists:sort(maps:keys(OriginListener)), lists:sort(maps:keys(Create))), Get1 = request(get, NewPath, [], []), ?assertMatch(Create, Get1), - ?assert(is_running(NewListenerId)), + ?assertEqual({true, NewListenerId}, {is_running(NewListenerId), NewListenerId}), %% create with required options MinPath = emqx_mgmt_api_test_util:api_path(["listeners", MinListenerId]), @@ -448,7 +446,7 @@ data_file(Name) -> cert_file(Name) -> data_file(filename:join(["certs", Name])). -default_listeners_hcon_text() -> +default_listeners_hocon_text() -> Sc = #{roots => emqx_schema:fields("listeners")}, Listeners = hocon_tconf:make_serializable(Sc, #{}, #{}), Config = #{<<"listeners">> => Listeners},