From 4b44fda16bb53ccf930c8fa11873fafa7040a586 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 23 Jun 2022 10:13:48 -0300 Subject: [PATCH 1/6] fix(hooks): allow `client.subscribe` hook to reject subscriptions Port of https://github.com/emqx/emqx/pull/8288 --- apps/emqx/src/emqx.appup.src | 6 ++++-- apps/emqx/src/emqx_channel.erl | 19 ++++++++++++------- apps/emqx/test/emqx_broker_SUITE.erl | 22 ++++++++++++++++++++++ 3 files changed, 38 insertions(+), 9 deletions(-) diff --git a/apps/emqx/src/emqx.appup.src b/apps/emqx/src/emqx.appup.src index 2c54bb0cb..2d685ac69 100644 --- a/apps/emqx/src/emqx.appup.src +++ b/apps/emqx/src/emqx.appup.src @@ -2,12 +2,14 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"5.0.0", - [{load_module,emqx_schema,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_schema,brutal_purge,soft_purge,[]}, {load_module,emqx_release,brutal_purge,soft_purge,[]}, {load_module,emqx_relup}]}, {<<".*">>,[]}], [{"5.0.0", - [{load_module,emqx_schema,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_schema,brutal_purge,soft_purge,[]}, {load_module,emqx_release,brutal_purge,soft_purge,[]}, {load_module,emqx_relup}]}, {<<".*">>,[]}]}. diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index ffea58fbd..29152a447 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -513,12 +513,6 @@ handle_in( true -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel); false -> - Replace = fun - _Fun(TupleList, [Tuple = {Key, _Value} | More]) -> - _Fun(lists:keyreplace(Key, 1, TupleList, Tuple), More); - _Fun(TupleList, []) -> - TupleList - end, TopicFilters2 = [TopicFilter || {TopicFilter, 0} <- TupleTopicFilters0], TopicFilters3 = run_hooks( 'client.subscribe', @@ -530,7 +524,18 @@ handle_in( Properties, Channel ), - TupleTopicFilters2 = Replace(TupleTopicFilters0, TupleTopicFilters1), + TupleTopicFilters2 = + lists:foldl( + fun + ({{Topic, Opts = #{delete := true}}, _QoS}, Acc) -> + Key = {Topic, maps:without([delete], Opts)}, + lists:keydelete(Key, 1, Acc); + (Tuple = {Key, _Value}, Acc) -> + lists:keyreplace(Key, 1, Acc, Tuple) + end, + TupleTopicFilters0, + TupleTopicFilters1 + ), ReasonCodes2 = [ ReasonCode || {_TopicFilter, ReasonCode} <- TupleTopicFilters2 diff --git a/apps/emqx/test/emqx_broker_SUITE.erl b/apps/emqx/test/emqx_broker_SUITE.erl index 41ff43311..fc9f8d192 100644 --- a/apps/emqx/test/emqx_broker_SUITE.erl +++ b/apps/emqx/test/emqx_broker_SUITE.erl @@ -715,6 +715,24 @@ t_connack_auth_error(Config) when is_list(Config) -> ?assertEqual(2, emqx_metrics:val('packets.connack.auth_error')), ok. +t_handle_in_empty_client_subscribe_hook({init, Config}) -> + Hook = {?MODULE, client_subscribe_delete_all_hook, []}, + ok = emqx_hooks:put('client.subscribe', Hook, _Priority = 100), + Config; +t_handle_in_empty_client_subscribe_hook({'end', _Config}) -> + emqx_hooks:del('client.subscribe', {?MODULE, client_subscribe_delete_all_hook}), + ok; +t_handle_in_empty_client_subscribe_hook(Config) when is_list(Config) -> + {ok, C} = emqtt:start_link(), + {ok, _} = emqtt:connect(C), + try + {ok, _, RCs} = emqtt:subscribe(C, <<"t">>), + ?assertEqual([], RCs), + ok + after + emqtt:disconnect(C) + end. + wait_for_events(Action, Kinds) -> wait_for_events(Action, Kinds, 500). @@ -771,3 +789,7 @@ recv_msgs(Count, Msgs) -> after 100 -> Msgs end. + +client_subscribe_delete_all_hook(_ClientInfo, _Username, TopicFilter) -> + EmptyFilters = [{T, Opts#{delete => true}} || {T, Opts} <- TopicFilter], + {stop, EmptyFilters}. From 5dec0be6e45a6a4a6b9e6c610c288895a1747b74 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Sun, 26 Jun 2022 14:13:14 +0200 Subject: [PATCH 2/6] refactor(emqx_schema): remove union wrapper around array of maps it's previously a union of array (of maps) and map. since the second union member has been removed there is no point of keeping the union wrapper --- apps/emqx/src/emqx_schema.erl | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index f0655d438..c38ef6111 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -2216,29 +2216,29 @@ str(B) when is_binary(B) -> str(S) when is_list(S) -> S. -authentication(Type) -> +authentication(Which) -> Desc = - case Type of + case Which of global -> ?DESC(global_authentication); listener -> ?DESC(listener_authentication) end, - %% authentication schema is lazy to make it more 'plugable' - %% the type checks are done in emqx_auth application when it boots. - %% and in emqx_authentication_config module for runtime changes. - Default = hoconsc:lazy(hoconsc:union([hoconsc:array(typerefl:map())])), - %% as the type is lazy, the runtime module injection + %% The runtime module injection %% from EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY %% is for now only affecting document generation. %% maybe in the future, we can find a more straightforward way to support %% * document generation (at compile time) %% * type checks before boot (in bin/emqx config generation) %% * type checks at runtime (when changing configs via management API) + Type0 = + case persistent_term:get(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY, undefined) of + undefined -> hoconsc:array(typerefl:map()); + Module -> Module:root_type() + end, + %% It is a lazy type because when handing runtime update requests + %% the config is not checked by emqx_schema, but by the injected schema + Type = hoconsc:lazy(Type0), #{ - type => - case persistent_term:get(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY, undefined) of - undefined -> Default; - Module -> hoconsc:lazy(Module:root_type()) - end, + type => Type, desc => Desc }. From 55558ccfbdefe22009a848c703d341f613863c2e Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Sun, 26 Jun 2022 14:16:08 +0200 Subject: [PATCH 3/6] chore: allow lazy type environment variable override --- apps/emqx/src/emqx_config.erl | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index ab98ded69..236b8b1a9 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -144,7 +144,7 @@ get_root([RootName | _]) -> %% @doc For the given path, get raw root value enclosed in a single-key map. %% key is ensured to be binary. get_root_raw([RootName | _]) -> - #{bin(RootName) => do_get(?RAW_CONF, [RootName], #{})}. + #{bin(RootName) => do_get_raw([RootName], #{})}. %% @doc Get a config value for the given path. %% The path should at least include root config name. @@ -173,7 +173,7 @@ find(KeyPath) -> {ok, term()} | {not_found, emqx_map_lib:config_key_path(), term()}. find_raw([]) -> Ref = make_ref(), - case do_get(?RAW_CONF, [], Ref) of + case do_get_raw([], Ref) of Ref -> {not_found, []}; Res -> {ok, Res} end; @@ -281,10 +281,10 @@ get_default_value([RootName | _] = KeyPath) -> end. -spec get_raw(emqx_map_lib:config_key_path()) -> term(). -get_raw(KeyPath) -> hocon_tconf:remove_env_meta(do_get(?RAW_CONF, KeyPath)). +get_raw(KeyPath) -> do_get_raw(KeyPath). -spec get_raw(emqx_map_lib:config_key_path(), term()) -> term(). -get_raw(KeyPath, Default) -> hocon_tconf:remove_env_meta(do_get(?RAW_CONF, KeyPath, Default)). +get_raw(KeyPath, Default) -> do_get_raw(KeyPath, Default). -spec put_raw(map()) -> ok. put_raw(Config) -> @@ -398,11 +398,11 @@ include_dirs() -> [filename:join(emqx:data_dir(), "configs")]. merge_envs(SchemaMod, RawConf) -> - %% TODO: evil, remove, required should be declared in schema Opts = #{ required => false, format => map, - apply_override_envs => true + apply_override_envs => true, + check_lazy => true }, hocon_tconf:merge_env_overrides(SchemaMod, RawConf, all, Opts). @@ -571,6 +571,12 @@ load_hocon_file(FileName, LoadType) -> #{} end. +do_get_raw(Path) -> + hocon_tconf:remove_env_meta(do_get(?RAW_CONF, Path)). + +do_get_raw(Path, Default) -> + hocon_tconf:remove_env_meta(do_get(?RAW_CONF, Path, Default)). + do_get(Type, KeyPath) -> Ref = make_ref(), Res = do_get(Type, KeyPath, Ref), From 130d1f7d9cc27f7e8b1719fbbad754fa5713e2d0 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Tue, 28 Jun 2022 10:11:52 +0800 Subject: [PATCH 4/6] chore: choose core node's max tnxid as source conf --- apps/emqx_conf/src/emqx_conf_app.erl | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index 61c56a398..7b5f5c3cb 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -140,10 +140,14 @@ copy_override_conf_from_core_node() -> end; _ -> SortFun = fun( - {ok, #{wall_clock := W1}}, - {ok, #{wall_clock := W2}} + {ok, #{wall_clock := W1, tnx_id := TnxId1}}, + {ok, #{wall_clock := W2, tnx_id := TnxId2}} ) -> - W1 > W2 + if + TnxId1 > TnxId2 -> true; + TnxId1 =:= TnxId2 -> W1 > W2; + true -> false + end end, [{ok, Info} | _] = lists:sort(SortFun, Ready), #{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info, From 6372456211abc7e4493189230e51248b7676b0ed Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Tue, 28 Jun 2022 10:30:16 +0800 Subject: [PATCH 5/6] chore: move vm.args.* to etc/ --- apps/emqx/etc/emqx_edge/vm.args | 125 ------------------ .../etc/{emqx_cloud/vm.args => vm.args.cloud} | 0 mix.exs | 2 +- rebar.config.erl | 2 +- 4 files changed, 2 insertions(+), 127 deletions(-) delete mode 100644 apps/emqx/etc/emqx_edge/vm.args rename apps/emqx/etc/{emqx_cloud/vm.args => vm.args.cloud} (100%) diff --git a/apps/emqx/etc/emqx_edge/vm.args b/apps/emqx/etc/emqx_edge/vm.args deleted file mode 100644 index f9fcd675d..000000000 --- a/apps/emqx/etc/emqx_edge/vm.args +++ /dev/null @@ -1,125 +0,0 @@ -###################################################################### -## Erlang VM Args -###################################################################### - -## NOTE: -## -## Arguments configured in this file might be overridden by configs from `emqx.conf`. -## -## Some basic VM arguments are to be configured in `emqx.conf`, -## such as `node.name` for `-name` and `node.cooke` for `-setcookie`. - -## Sets the maximum number of simultaneously existing processes for this system. -+P 16384 -## Sets the maximum number of simultaneously existing ports for this system. -+Q 4096 - -## Sets the maximum number of ETS tables -+e 512 - -## Sets the maximum number of atoms the virtual machine can handle. -+t 262144 - -## Set the location of crash dumps --env ERL_CRASH_DUMP {{ platform_log_dir }}/crash.dump - -## Set how many times generational garbages collections can be done without -## forcing a fullsweep collection. --env ERL_FULLSWEEP_AFTER 0 - -## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive -## (Disabled by default..use with caution!) --heart - -## Specify the erlang distributed protocol. -## Can be one of: inet_tcp, inet6_tcp, inet_tls -#-proto_dist inet_tcp - -## The shell is started in a restricted mode. -## In this mode, the shell evaluates a function call only if allowed. -## Prevent user from accidentally calling a function from the prompt that could harm a running system. --stdlib restricted_shell emqx_restricted_shell - -## Specify SSL Options in the file if using SSL for Erlang Distribution. -## Used only when -proto_dist set to inet_tls -#-ssl_dist_optfile {{ platform_etc_dir }}/ssl_dist.conf - -## Specifies the net_kernel tick time in seconds. -## This is the approximate time a connected node may be unresponsive until -## it is considered down and thereby disconnected. -#-kernel net_ticktime 60 - -## Sets the distribution buffer busy limit (dist_buf_busy_limit). -+zdbbl 1024 - -## Sets default scheduler hint for port parallelism. -+spp false - -## Sets the number of threads in async thread pool. Valid range is 0-1024. -## Increase the parameter if there are many simultaneous file I/O operations. -+A 1 - -## Sets the default heap size of processes to the size Size. -#+hms 233 - -## Sets the default binary virtual heap size of processes to the size Size. -#+hmbs 46422 - -## Sets the default maximum heap size of processes to the size Size. -## Defaults to 0, which means that no maximum heap size is used. -##For more information, see process_flag(max_heap_size, MaxHeapSize). -#+hmax 0 - -## Sets the default value for process flag message_queue_data. Defaults to on_heap. -#+hmqd on_heap | off_heap - -## Sets the number of IO pollsets to use when polling for I/O. -+IOp 1 - -## Sets the number of IO poll threads to use when polling for I/O. -+IOt 1 - -## Sets the number of scheduler threads to create and scheduler threads to set online. -+S 1:1 - -## Sets the number of dirty CPU scheduler threads to create and dirty CPU scheduler threads to set online. -+SDcpu 1:1 - -## Sets the number of dirty I/O scheduler threads to create. -+SDio 1 - -## Suggested stack size, in kilowords, for scheduler threads. -#+sss 32 - -## Suggested stack size, in kilowords, for dirty CPU scheduler threads. -#+sssdcpu 40 - -## Suggested stack size, in kilowords, for dirty IO scheduler threads. -#+sssdio 40 - -## Sets scheduler bind type. -## Can be one of: u, ns, ts, ps, s, nnts, nnps, tnnps, db -#+sbt db - -## Sets a user-defined CPU topology. -#+sct L0-3c0-3p0N0:L4-7c0-3p1N1 - -## Sets the mapping of warning messages for error_logger -#+W w - -## Sets time warp mode: no_time_warp | single_time_warp | multi_time_warp -#+C no_time_warp - -## Prevents loading information about source filenames and line numbers. -+L - -## Specifies how long time (in milliseconds) to spend shutting down the system. -## See: http://erlang.org/doc/man/erl.html --shutdown_time 10000 - -## patches dir --pa "{{ platform_data_dir }}/patches" - -## Mnesia thresholds --mnesia dump_log_write_threshold 5000 --mnesia dump_log_time_threshold 60000 diff --git a/apps/emqx/etc/emqx_cloud/vm.args b/apps/emqx/etc/vm.args.cloud similarity index 100% rename from apps/emqx/etc/emqx_cloud/vm.args rename to apps/emqx/etc/vm.args.cloud diff --git a/mix.exs b/mix.exs index 66ce8e9c7..87da52b6b 100644 --- a/mix.exs +++ b/mix.exs @@ -370,7 +370,7 @@ defmodule EMQXUmbrella.MixProject do vm_args_template_path = case release_type do :cloud -> - "apps/emqx/etc/emqx_cloud/vm.args" + "apps/emqx/etc/vm.args.cloud" end render_template( diff --git a/rebar.config.erl b/rebar.config.erl index 5a73b2b93..bb790dd7c 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -412,7 +412,7 @@ emqx_etc_overlay(ReleaseType, Edition) -> emqx_etc_overlay_common(). emqx_etc_overlay_per_rel(cloud) -> - [{"{{base_dir}}/lib/emqx/etc/emqx_cloud/vm.args", "etc/vm.args"}]. + [{"{{base_dir}}/lib/emqx/etc/vm.args.cloud", "etc/vm.args"}]. emqx_etc_overlay_common() -> [{"{{base_dir}}/lib/emqx/etc/ssl_dist.conf", "etc/ssl_dist.conf"}]. From c3cd36f610adf6873981c6c4d7e0d8512e97964f Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Tue, 28 Jun 2022 15:29:26 +0800 Subject: [PATCH 6/6] fix: elvis warning --- apps/emqx_conf/src/emqx_conf_app.erl | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index 7b5f5c3cb..6e62db325 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -139,17 +139,7 @@ copy_override_conf_from_core_node() -> copy_override_conf_from_core_node() end; _ -> - SortFun = fun( - {ok, #{wall_clock := W1, tnx_id := TnxId1}}, - {ok, #{wall_clock := W2, tnx_id := TnxId2}} - ) -> - if - TnxId1 > TnxId2 -> true; - TnxId1 =:= TnxId2 -> W1 > W2; - true -> false - end - end, - [{ok, Info} | _] = lists:sort(SortFun, Ready), + [{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready), #{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info, Msg = #{ msg => "copy_overide_conf_from_core_node_success", @@ -177,3 +167,9 @@ should_proceed_with_boot() -> %% be up. Try again. false end. + +conf_sort({ok, #{tnx_id := Id1}}, {ok, #{tnx_id := Id2}}) when Id1 > Id2 -> true; +conf_sort({ok, #{tnx_id := Id, wall_clock := W1}}, {ok, #{tnx_id := Id, wall_clock := W2}}) -> + W1 > W2; +conf_sort({ok, _}, {ok, _}) -> + false.