From c7e540f4f10ca99e53e0287f53ebe9674adfb273 Mon Sep 17 00:00:00 2001 From: turtleDeng Date: Thu, 1 Jul 2021 20:08:13 +0800 Subject: [PATCH] feat(modules): Update the configuration file to hocon (#5151) --- apps/emqx_modules/etc/emqx_modules.conf | 32 ++++ apps/emqx_modules/priv/emqx_modules.schema | 1 - .../src/emqx_mod_api_topic_metrics.erl | 6 +- apps/emqx_modules/src/emqx_mod_presence.erl | 2 +- apps/emqx_modules/src/emqx_mod_rewrite.erl | 27 ++-- .../src/emqx_mod_subscription.erl | 65 -------- apps/emqx_modules/src/emqx_mod_sup.erl | 1 - apps/emqx_modules/src/emqx_modules.app.src | 2 +- apps/emqx_modules/src/emqx_modules.appup.src | 23 --- apps/emqx_modules/src/emqx_modules.erl | 142 ++++++------------ apps/emqx_modules/src/emqx_modules_api.erl | 31 ++-- apps/emqx_modules/src/emqx_modules_app.erl | 3 - apps/emqx_modules/src/emqx_modules_schema.erl | 61 ++++++++ .../test/emqx_mod_delayed_SUITE.erl | 9 +- .../test/emqx_mod_presence_SUITE.erl | 4 +- .../test/emqx_mod_rewrite_SUITE.erl | 13 +- .../test/emqx_mod_subscription_SUITE.erl | 92 ------------ apps/emqx_modules/test/emqx_mod_sup_SUITE.erl | 49 ------ apps/emqx_modules/test/emqx_modules_SUITE.erl | 48 +++--- apps/emqx_telemetry/src/emqx_telemetry.erl | 7 +- data/loaded_modules.tmpl | 2 - data/loaded_plugins.tmpl | 1 - rebar.config.erl | 8 +- 23 files changed, 217 insertions(+), 412 deletions(-) delete mode 100644 apps/emqx_modules/priv/emqx_modules.schema delete mode 100644 apps/emqx_modules/src/emqx_mod_subscription.erl delete mode 100644 apps/emqx_modules/src/emqx_modules.appup.src create mode 100644 apps/emqx_modules/src/emqx_modules_schema.erl delete mode 100644 apps/emqx_modules/test/emqx_mod_subscription_SUITE.erl delete mode 100644 apps/emqx_modules/test/emqx_mod_sup_SUITE.erl delete mode 100644 data/loaded_modules.tmpl diff --git a/apps/emqx_modules/etc/emqx_modules.conf b/apps/emqx_modules/etc/emqx_modules.conf index 1bb8bf6d7..3f4681ec9 100644 --- a/apps/emqx_modules/etc/emqx_modules.conf +++ b/apps/emqx_modules/etc/emqx_modules.conf @@ -1 +1,33 @@ # empty +emqx_modules: { + modules:[ + { + type: delayed + enable: false + }, + { + type: presence + enable: true + qos: 1 + }, + { + type: recon + enable: true + }, + { + type: rewrite + enable: false + rules:[{ + action: publish + source_topic: "x/#" + re: "^x/y/(.+)$" + dest_topic: "z/y/$1" + }] + }, + { + type: topic_metrics + enable: false + topics: ["topic/#"] + } + ] +} diff --git a/apps/emqx_modules/priv/emqx_modules.schema b/apps/emqx_modules/priv/emqx_modules.schema deleted file mode 100644 index d7c52c644..000000000 --- a/apps/emqx_modules/priv/emqx_modules.schema +++ /dev/null @@ -1 +0,0 @@ -% empty diff --git a/apps/emqx_modules/src/emqx_mod_api_topic_metrics.erl b/apps/emqx_modules/src/emqx_mod_api_topic_metrics.erl index 5ccef4c6b..d78b3f18a 100644 --- a/apps/emqx_modules/src/emqx_mod_api_topic_metrics.erl +++ b/apps/emqx_modules/src/emqx_mod_api_topic_metrics.erl @@ -116,11 +116,7 @@ unregister(#{topic := Topic0}, _Params) -> end). execute_when_enabled(Fun) -> - Enabled = case emqx_modules:find_module(emqx_mod_topic_metrics) of - [{_, false}] -> false; - [{_, true}] -> true - end, - case Enabled of + case emqx_modules:find_module(topic_metrics) of true -> Fun(); false -> diff --git a/apps/emqx_modules/src/emqx_mod_presence.erl b/apps/emqx_modules/src/emqx_mod_presence.erl index 7ba147c9a..738d1c892 100644 --- a/apps/emqx_modules/src/emqx_mod_presence.erl +++ b/apps/emqx_modules/src/emqx_mod_presence.erl @@ -117,7 +117,7 @@ topic(connected, ClientId) -> topic(disconnected, ClientId) -> emqx_topic:systop(iolist_to_binary(["clients/", ClientId, "/disconnected"])). -qos(Env) -> proplists:get_value(qos, Env, 0). +qos(Env) -> maps:get(qos, Env, 0). -compile({inline, [reason/1]}). reason(Reason) when is_atom(Reason) -> Reason; diff --git a/apps/emqx_modules/src/emqx_mod_rewrite.erl b/apps/emqx_modules/src/emqx_mod_rewrite.erl index c3a550692..e1ee6e1b6 100644 --- a/apps/emqx_modules/src/emqx_mod_rewrite.erl +++ b/apps/emqx_modules/src/emqx_mod_rewrite.erl @@ -43,8 +43,8 @@ %% Load/Unload %%-------------------------------------------------------------------- -load(RawRules) -> - {PubRules, SubRules} = compile(RawRules), +load(Env) -> + {PubRules, SubRules} = compile(maps:get(rules, Env, [])), emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}), emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}), emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}). @@ -70,20 +70,23 @@ description() -> %%-------------------------------------------------------------------- compile(Rules) -> - PubRules = [ begin - {ok, MP} = re:compile(Re), - {rewrite, Topic, MP, Dest} - end || {rewrite, pub, Topic, Re, Dest}<- Rules ], - SubRules = [ begin - {ok, MP} = re:compile(Re), - {rewrite, Topic, MP, Dest} - end || {rewrite, sub, Topic, Re, Dest}<- Rules ], - {PubRules, SubRules}. + lists:foldl(fun(#{source_topic := Topic, + re := Re, + dest_topic := Dest, + action := Action}, {Acc1, Acc2}) -> + {ok, MP} = re:compile(Re), + case Action of + publish -> + {[{Topic, MP, Dest} | Acc1], Acc2}; + subscribe -> + {Acc1, [{Topic, MP, Dest} | Acc2]} + end + end, {[], []}, Rules). match_and_rewrite(Topic, []) -> Topic; -match_and_rewrite(Topic, [{rewrite, Filter, MP, Dest} | Rules]) -> +match_and_rewrite(Topic, [{Filter, MP, Dest} | Rules]) -> case emqx_topic:match(Topic, Filter) of true -> rewrite(Topic, MP, Dest); false -> match_and_rewrite(Topic, Rules) diff --git a/apps/emqx_modules/src/emqx_mod_subscription.erl b/apps/emqx_modules/src/emqx_mod_subscription.erl deleted file mode 100644 index 06178aee7..000000000 --- a/apps/emqx_modules/src/emqx_mod_subscription.erl +++ /dev/null @@ -1,65 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_mod_subscription). - --behaviour(emqx_gen_mod). - --include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/emqx_mqtt.hrl"). - -%% emqx_gen_mod callbacks --export([ load/1 - , unload/1 - , description/0 - ]). - -%% APIs --export([on_client_connected/3]). - -%%-------------------------------------------------------------------- -%% Load/Unload Hook -%%-------------------------------------------------------------------- - -load(Topics) -> - emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}). - -on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo = #{proto_ver := ProtoVer}, Topics) -> - Replace = fun(Topic) -> - rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) - end, - TopicFilters = case ProtoVer of - ?MQTT_PROTO_V5 -> [{Replace(Topic), SubOpts} || {Topic, SubOpts} <- Topics]; - _ -> [{Replace(Topic), #{qos => Qos}} || {Topic, #{qos := Qos}} <- Topics] - end, - self() ! {subscribe, TopicFilters}. - -unload(_) -> - emqx_hooks:del('client.connected', {?MODULE, on_client_connected}). - -description() -> - "EMQ X Subscription Module". -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -rep(<<"%c">>, ClientId, Topic) -> - emqx_topic:feed_var(<<"%c">>, ClientId, Topic); -rep(<<"%u">>, undefined, Topic) -> - Topic; -rep(<<"%u">>, Username, Topic) -> - emqx_topic:feed_var(<<"%u">>, Username, Topic). - diff --git a/apps/emqx_modules/src/emqx_mod_sup.erl b/apps/emqx_modules/src/emqx_mod_sup.erl index 755e52a60..c47d47a10 100644 --- a/apps/emqx_modules/src/emqx_mod_sup.erl +++ b/apps/emqx_modules/src/emqx_mod_sup.erl @@ -60,7 +60,6 @@ stop_child(ChildId) -> %%-------------------------------------------------------------------- init([]) -> - ok = emqx_tables:new(emqx_modules, [set, public, {write_concurrency, true}]), {ok, {{one_for_one, 10, 100}, []}}. %%-------------------------------------------------------------------- diff --git a/apps/emqx_modules/src/emqx_modules.app.src b/apps/emqx_modules/src/emqx_modules.app.src index 702652fc2..5d251abc9 100644 --- a/apps/emqx_modules/src/emqx_modules.app.src +++ b/apps/emqx_modules/src/emqx_modules.app.src @@ -1,6 +1,6 @@ {application, emqx_modules, [{description, "EMQ X Module Management"}, - {vsn, "4.3.2"}, + {vsn, "5.0.0"}, {modules, []}, {applications, [kernel,stdlib]}, {mod, {emqx_modules_app, []}}, diff --git a/apps/emqx_modules/src/emqx_modules.appup.src b/apps/emqx_modules/src/emqx_modules.appup.src deleted file mode 100644 index aa997c453..000000000 --- a/apps/emqx_modules/src/emqx_modules.appup.src +++ /dev/null @@ -1,23 +0,0 @@ -%% -*-: erlang -*- -{VSN, - [ - {"4.3.1", [ - {load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []} - ]}, - {"4.3.0", [ - {update, emqx_mod_delayed, {advanced, []}}, - {load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []} - ]}, - {<<".*">>, []} - ], - [ - {"4.3.1", [ - {load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []} - ]}, - {"4.3.0", [ - {update, emqx_mod_delayed, {advanced, []}}, - {load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []} - ]}, - {<<".*">>, []} - ] -}. diff --git a/apps/emqx_modules/src/emqx_modules.erl b/apps/emqx_modules/src/emqx_modules.erl index 262e700ab..6610a4716 100644 --- a/apps/emqx_modules/src/emqx_modules.erl +++ b/apps/emqx_modules/src/emqx_modules.erl @@ -22,12 +22,11 @@ -export([ list/0 , load/0 - , load/1 + , load/2 , unload/0 , unload/1 , reload/1 , find_module/1 - , load_module/2 ]). -export([cli/1]). @@ -35,48 +34,51 @@ %% @doc List all available plugins -spec(list() -> [{atom(), boolean()}]). list() -> - ets:tab2list(?MODULE). + persistent_term:get(?MODULE, []). %% @doc Load all the extended modules. -spec(load() -> ok). load() -> - case emqx:get_env(modules_loaded_file) of - undefined -> ok; - File -> - load_modules(File) - end. + Modules = emqx_config:get([emqx_modules, modules], []), + lists:foreach(fun(#{type := Module, enable := Enable} = Config) -> + case Enable of + true -> + load(name(Module), maps:without([type, enable], Config)); + false -> + ok + end + end, Modules). -load(ModuleName) -> +load(Module, Env) -> + ModuleName = name(Module), case find_module(ModuleName) of - [] -> - ?LOG(alert, "Module ~s not found, cannot load it", [ModuleName]), - {error, not_found}; - [{ModuleName, true}] -> + false -> + load_mod(ModuleName, Env); + true -> ?LOG(notice, "Module ~s is already started", [ModuleName]), - {error, already_started}; - [{ModuleName, false}] -> - emqx_modules:load_module(ModuleName, true) + {error, already_started} end. %% @doc Unload all the extended modules. -spec(unload() -> ok). unload() -> - case emqx:get_env(modules_loaded_file) of - undefined -> ignore; - File -> - unload_modules(File) - end. + Modules = emqx_config:get([emqx_modules, modules], []), + lists:foreach(fun(#{type := Module, enable := Enable}) -> + case Enable of + true -> + unload_mod(name(Module)); + false -> + ok + end + end, Modules). unload(ModuleName) -> case find_module(ModuleName) of - [] -> + false -> ?LOG(alert, "Module ~s not found, cannot load it", [ModuleName]), - {error, not_found}; - [{ModuleName, false}] -> - ?LOG(error, "Module ~s is not started", [ModuleName]), {error, not_started}; - [{ModuleName, true}] -> - unload_module(ModuleName, true) + true -> + unload_mod(ModuleName) end. -spec(reload(module()) -> ok | ignore | {error, any()}). @@ -84,94 +86,39 @@ reload(_) -> ignore. find_module(ModuleName) -> - ets:lookup(?MODULE, ModuleName). + lists:member(ModuleName, persistent_term:get(?MODULE, [])). -filter_module(ModuleNames) -> - filter_module(ModuleNames, emqx:get_env(modules, [])). -filter_module([], Acc) -> - Acc; -filter_module([{ModuleName, true} | ModuleNames], Acc) -> - filter_module(ModuleNames, lists:keydelete(ModuleName, 1, Acc)); -filter_module([{_, false} | ModuleNames], Acc) -> - filter_module(ModuleNames, Acc). - -load_modules(File) -> - case file:consult(File) of - {ok, ModuleNames} -> - lists:foreach(fun({ModuleName, _}) -> - ets:insert(?MODULE, {ModuleName, false}) - end, filter_module(ModuleNames)), - lists:foreach(fun load_module/1, ModuleNames); - {error, Error} -> - ?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error]) - end. - -load_module({ModuleName, true}) -> - emqx_modules:load_module(ModuleName, false); -load_module({ModuleName, false}) -> - ets:insert(?MODULE, {ModuleName, false}); -load_module(ModuleName) -> - load_module({ModuleName, true}). - -load_module(ModuleName, Persistent) -> - Modules = emqx:get_env(modules, []), - Env = proplists:get_value(ModuleName, Modules, undefined), +load_mod(ModuleName, Env) -> case ModuleName:load(Env) of ok -> - ets:insert(?MODULE, {ModuleName, true}), - ok = write_loaded(Persistent), + Modules = persistent_term:get(?MODULE, []), + persistent_term:put(?MODULE, [ModuleName| Modules]), ?LOG(info, "Load ~s module successfully.", [ModuleName]); {error, Error} -> ?LOG(error, "Load module ~s failed, cannot load for ~0p", [ModuleName, Error]), {error, Error} end. -unload_modules(File) -> - case file:consult(File) of - {ok, ModuleNames} -> - lists:foreach(fun unload_module/1, ModuleNames); - {error, Error} -> - ?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error]) - end. -unload_module({ModuleName, true}) -> - unload_module(ModuleName, false); -unload_module({ModuleName, false}) -> - ets:insert(?MODULE, {ModuleName, false}); -unload_module(ModuleName) -> - unload_module({ModuleName, true}). - -unload_module(ModuleName, Persistent) -> - Modules = emqx:get_env(modules, []), - Env = proplists:get_value(ModuleName, Modules, undefined), - case ModuleName:unload(Env) of +unload_mod(ModuleName) -> + case ModuleName:unload(#{}) of ok -> - ets:insert(?MODULE, {ModuleName, false}), - ok = write_loaded(Persistent), + Modules = persistent_term:get(?MODULE, []), + persistent_term:put(?MODULE, Modules -- [ModuleName]), ?LOG(info, "Unload ~s module successfully.", [ModuleName]); {error, Error} -> ?LOG(error, "Unload module ~s failed, cannot unload for ~0p", [ModuleName, Error]) end. -write_loaded(true) -> - FilePath = emqx:get_env(modules_loaded_file), - case file:write_file(FilePath, [io_lib:format("~p.~n", [Name]) || Name <- list()]) of - ok -> ok; - {error, Error} -> - ?LOG(error, "Write File ~p Error: ~p", [FilePath, Error]), - ok - end; -write_loaded(false) -> ok. - %%-------------------------------------------------------------------- %% @doc Modules Command cli(["list"]) -> - lists:foreach(fun({Name, Active}) -> - emqx_ctl:print("Module(~s, description=~s, active=~s)~n", - [Name, Name:description(), Active]) + lists:foreach(fun(Name) -> + emqx_ctl:print("Module(~s, description=~s)~n", + [Name, Name:description()]) end, emqx_modules:list()); cli(["load", Name]) -> - case emqx_modules:load(list_to_atom(Name)) of + case emqx_modules:load(list_to_atom(Name), #{}) of ok -> emqx_ctl:print("Module ~s loaded successfully.~n", [Name]); {error, Reason} -> @@ -195,3 +142,10 @@ cli(_) -> {"modules unload ", "Unload module"}, {"modules reload ", "Reload module"} ]). + +name(delayed) -> emqx_mod_delayed; +name(presence) -> emqx_mod_presence; +name(recon) -> emqx_mod_recon; +name(rewrite) -> emqx_mod_rewrite; +name(topic_metrics) -> emqx_mod_topic_metrics; +name(Name) -> Name. diff --git a/apps/emqx_modules/src/emqx_modules_api.erl b/apps/emqx_modules/src/emqx_modules_api.erl index 3490c116c..3a4b05fd0 100644 --- a/apps/emqx_modules/src/emqx_modules_api.erl +++ b/apps/emqx_modules/src/emqx_modules_api.erl @@ -73,7 +73,7 @@ , reload/2 ]). --export([ do_load_module/2 +-export([ do_load_module/3 , do_unload_module/2 ]). @@ -83,11 +83,11 @@ list(#{node := Node}, _Params) -> list(_Bindings, _Params) -> return({ok, [format(Node, Modules) || {Node, Modules} <- list_modules()]}). -load(#{node := Node, module := Module}, _Params) -> - return(do_load_module(Node, Module)); +load(#{node := Node, module := Module}, Params) -> + return(do_load_module(Node, Module, Params)); -load(#{module := Module}, _Params) -> - Results = [do_load_module(Node, Module) || Node <- ekka_mnesia:running_nodes()], +load(#{module := Module}, Params) -> + Results = [do_load_module(Node, Module, Params) || Node <- ekka_mnesia:running_nodes()], case lists:filter(fun(Item) -> Item =/= ok end, Results) of [] -> return(ok); @@ -129,10 +129,9 @@ reload(#{module := Module}, _Params) -> format(Node, Modules) -> #{node => Node, modules => [format(Module) || Module <- Modules]}. -format({Name, Active}) -> - #{name => Name, - description => iolist_to_binary(Name:description()), - active => Active}. +format(Name) -> + #{name => name(Name), + description => iolist_to_binary(Name:description())}. list_modules() -> [{Node, list_modules(Node)} || Node <- ekka_mnesia:running_nodes()]. @@ -142,10 +141,10 @@ list_modules(Node) when Node =:= node() -> list_modules(Node) -> rpc_call(Node, list_modules, [Node]). -do_load_module(Node, Module) when Node =:= node() -> - emqx_modules:load(Module); -do_load_module(Node, Module) -> - rpc_call(Node, do_load_module, [Node, Module]). +do_load_module(Node, Module, Env) when Node =:= node() -> + emqx_modules:load(Module, Env); +do_load_module(Node, Module, Env) -> + rpc_call(Node, do_load_module, [Node, Module, Env]). do_unload_module(Node, Module) when Node =:= node() -> emqx_modules:unload(Module); @@ -162,3 +161,9 @@ rpc_call(Node, Fun, Args) -> {badrpc, Reason} -> {error, Reason}; Res -> Res end. + +name(emqx_mod_delayed) -> delayed; +name(emqx_mod_presence) -> presence; +name(emqx_mod_recon) -> recon; +name(emqx_mod_rewrite) -> rewrite; +name(emqx_mod_topic_metrics) -> topic_metrics. diff --git a/apps/emqx_modules/src/emqx_modules_app.erl b/apps/emqx_modules/src/emqx_modules_app.erl index a10176829..33f18459e 100644 --- a/apps/emqx_modules/src/emqx_modules_app.erl +++ b/apps/emqx_modules/src/emqx_modules_app.erl @@ -23,9 +23,6 @@ -export([stop/1]). start(_Type, _Args) -> - % the configs for emqx_modules is so far still in emqx application - % Ensure it's loaded - _ = application:load(emqx), {ok, Pid} = emqx_mod_sup:start_link(), ok = emqx_modules:load(), emqx_ctl:register_command(modules, {emqx_modules, cli}, []), diff --git a/apps/emqx_modules/src/emqx_modules_schema.erl b/apps/emqx_modules/src/emqx_modules_schema.erl new file mode 100644 index 000000000..e7b26b7de --- /dev/null +++ b/apps/emqx_modules/src/emqx_modules_schema.erl @@ -0,0 +1,61 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_modules_schema). + +-include_lib("typerefl/include/types.hrl"). + +-behaviour(hocon_schema). + +-export([ structs/0 + , fields/1]). + +structs() -> ["emqx_modules"]. + +fields("emqx_modules") -> + [{modules, hoconsc:array(hoconsc:union([ hoconsc:ref(?MODULE, "common") + , hoconsc:ref(?MODULE, "presence") + , hoconsc:ref(?MODULE, "rewrite") + , hoconsc:ref(?MODULE, "topic_metrics") + ]))}]; +fields("common") -> + [ {type, hoconsc:enum([delayed, recon])} + , {enable, emqx_schema:t(boolean(), undefined, false)} + ]; + +fields("presence") -> + [ {type, hoconsc:enum([presence])} + , {enable, emqx_schema:t(boolean(), undefined, false)} + , {qos, emqx_schema:t(integer(), undefined, 1)} + ]; +fields("rewrite") -> + [ {type, hoconsc:enum([rewrite])} + , {enable, emqx_schema:t(boolean(), undefined, false)} + , {rules, hoconsc:array(hoconsc:ref(?MODULE, "rules"))} + ]; + +fields("topic_metrics") -> + [ {type, hoconsc:enum([topic_metrics])} + , {enable, emqx_schema:t(boolean(), undefined, false)} + , {topics, hoconsc:array(binary())} + ]; + +fields("rules") -> + [ {action, hoconsc:enum([publish, subscribe])} + , {source_topic, emqx_schema:t(binary())} + , {re, emqx_schema:t(binary())} + , {dest_topic, emqx_schema:t(binary())} + ]. diff --git a/apps/emqx_modules/test/emqx_mod_delayed_SUITE.erl b/apps/emqx_modules/test/emqx_mod_delayed_SUITE.erl index fcd73bb61..2cd1107f6 100644 --- a/apps/emqx_modules/test/emqx_mod_delayed_SUITE.erl +++ b/apps/emqx_modules/test/emqx_mod_delayed_SUITE.erl @@ -35,19 +35,12 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> - emqx_ct_helpers:start_apps([emqx_modules], fun set_special_configs/1), + emqx_ct_helpers:start_apps([emqx_modules]), Config. end_per_suite(_) -> emqx_ct_helpers:stop_apps([emqx_modules]). -set_special_configs(emqx) -> - application:set_env(emqx, modules, [{emqx_mod_delayed, []}]), - application:set_env(emqx, allow_anonymous, false), - application:set_env(emqx, enable_acl_cache, false); -set_special_configs(_App) -> - ok. - %%-------------------------------------------------------------------- %% Test cases %%-------------------------------------------------------------------- diff --git a/apps/emqx_modules/test/emqx_mod_presence_SUITE.erl b/apps/emqx_modules/test/emqx_mod_presence_SUITE.erl index fafcc3c2f..e02fa6fdd 100644 --- a/apps/emqx_modules/test/emqx_mod_presence_SUITE.erl +++ b/apps/emqx_modules/test/emqx_mod_presence_SUITE.erl @@ -36,7 +36,7 @@ end_per_suite(_Config) -> %% Test case for emqx_mod_presence t_mod_presence(_) -> - ok = emqx_mod_presence:load([{qos, ?QOS_1}]), + ok = emqx_mod_presence:load(#{qos => ?QOS_1}), {ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]), {ok, _} = emqtt:connect(C1), {ok, _Props, [?QOS_1]} = emqtt:subscribe(C1, <<"$SYS/brokers/+/clients/#">>, qos1), @@ -49,7 +49,7 @@ t_mod_presence(_) -> ok = emqtt:disconnect(C2), ok = recv_and_check_presence(<<"clientid">>, <<"disconnected">>), ok = emqtt:disconnect(C1), - ok = emqx_mod_presence:unload([{qos, ?QOS_1}]). + ok = emqx_mod_presence:unload([]). t_mod_presence_reason(_) -> ?assertEqual(normal, emqx_mod_presence:reason(normal)), diff --git a/apps/emqx_modules/test/emqx_mod_rewrite_SUITE.erl b/apps/emqx_modules/test/emqx_mod_rewrite_SUITE.erl index 997eff1c2..9b94cba8c 100644 --- a/apps/emqx_modules/test/emqx_mod_rewrite_SUITE.erl +++ b/apps/emqx_modules/test/emqx_mod_rewrite_SUITE.erl @@ -22,8 +22,15 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). --define(RULES, [{rewrite, pub, <<"x/#">>,<<"^x/y/(.+)$">>,<<"z/y/$1">>}, - {rewrite, sub, <<"y/+/z/#">>,<<"^y/(.+)/z/(.+)$">>,<<"y/z/$2">>} +-define(RULES, [#{action => publish, + source_topic => <<"x/#">>, + re => <<"^x/y/(.+)$">>, + dest_topic => <<"z/y/$1">> + }, + #{action => subscribe, + source_topic => <<"y/+/z/#">>, + re => <<"^y/(.+)/z/(.+)$">>, + dest_topic => <<"y/z/$2">>} ]). all() -> emqx_ct:all(?MODULE). @@ -40,7 +47,7 @@ end_per_suite(_Config) -> %% Test case for emqx_mod_write t_mod_rewrite(_Config) -> - ok = emqx_mod_rewrite:load(?RULES), + ok = emqx_mod_rewrite:load(#{rules => ?RULES}), {ok, C} = emqtt:start_link([{clientid, <<"rewrite_client">>}]), {ok, _} = emqtt:connect(C), PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>], diff --git a/apps/emqx_modules/test/emqx_mod_subscription_SUITE.erl b/apps/emqx_modules/test/emqx_mod_subscription_SUITE.erl deleted file mode 100644 index c2905754b..000000000 --- a/apps/emqx_modules/test/emqx_mod_subscription_SUITE.erl +++ /dev/null @@ -1,92 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_mod_subscription_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("emqx/include/emqx_mqtt.hrl"). --include_lib("eunit/include/eunit.hrl"). - -all() -> emqx_ct:all(?MODULE). - -init_per_suite(Config) -> - emqx_ct_helpers:boot_modules(all), - emqx_ct_helpers:start_apps([]), - Config. - -end_per_suite(_Config) -> - emqx_ct_helpers:stop_apps([]). - -t_on_client_connected(_) -> - ?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/%c/%u">>, #{qos => ?QOS_0}}])), - {ok, C} = emqtt:start_link([{host, "localhost"}, - {clientid, "myclient"}, - {username, "admin"}]), - {ok, _} = emqtt:connect(C), - emqtt:publish(C, <<"connected/myclient/admin">>, <<"Hello world">>, ?QOS_0), - {ok, #{topic := Topic, payload := Payload}} = receive_publish(100), - ?assertEqual(<<"connected/myclient/admin">>, Topic), - ?assertEqual(<<"Hello world">>, Payload), - ok = emqtt:disconnect(C), - ?assertEqual(ok, emqx_mod_subscription:unload([{<<"connected/%c/%u">>, #{qos => ?QOS_0}}])). - -t_on_undefined_client_connected(_) -> - ?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/undefined">>, #{qos => ?QOS_1}}])), - {ok, C} = emqtt:start_link([{host, "localhost"}]), - {ok, _} = emqtt:connect(C), - emqtt:publish(C, <<"connected/undefined">>, <<"Hello world">>, ?QOS_1), - {ok, #{topic := Topic, payload := Payload}} = receive_publish(100), - ?assertEqual(<<"connected/undefined">>, Topic), - ?assertEqual(<<"Hello world">>, Payload), - ok = emqtt:disconnect(C), - ?assertEqual(ok, emqx_mod_subscription:unload([{<<"connected/undefined">>, #{qos => ?QOS_1}}])). - -t_suboption(_) -> - Client_info = fun(Key, Client) -> maps:get(Key, maps:from_list(emqtt:info(Client)), undefined) end, - Suboption = #{qos => ?QOS_2, nl => 1, rap => 1, rh => 2}, - ?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/%c/%u">>, Suboption}])), - {ok, C1} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(C1), - timer:sleep(200), - [CPid1] = emqx_cm:lookup_channels(Client_info(clientid, C1)), - [ Sub1 | _ ] = ets:lookup(emqx_subscription,CPid1), - [ Suboption1 | _ ] = ets:lookup(emqx_suboption,Sub1), - ?assertMatch({Sub1, #{qos := 2, nl := 1, rap := 1, rh := 2, subid := _}}, Suboption1), - ok = emqtt:disconnect(C1), - %% The subscription option is not valid for MQTT V3.1.1 - {ok, C2} = emqtt:start_link([{proto_ver, v4}]), - {ok, _} = emqtt:connect(C2), - timer:sleep(200), - [CPid2] = emqx_cm:lookup_channels(Client_info(clientid, C2)), - [ Sub2 | _ ] = ets:lookup(emqx_subscription,CPid2), - [ Suboption2 | _ ] = ets:lookup(emqx_suboption,Sub2), - ok = emqtt:disconnect(C2), - ?assertMatch({Sub2, #{qos := 2, nl := 0, rap := 0, rh := 0, subid := _}}, Suboption2), - - ?assertEqual(ok, emqx_mod_subscription:unload([{<<"connected/undefined">>, Suboption}])). - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -receive_publish(Timeout) -> - receive - {publish, Publish} -> {ok, Publish} - after - Timeout -> {error, timeout} - end. diff --git a/apps/emqx_modules/test/emqx_mod_sup_SUITE.erl b/apps/emqx_modules/test/emqx_mod_sup_SUITE.erl deleted file mode 100644 index 59d0ffde2..000000000 --- a/apps/emqx_modules/test/emqx_mod_sup_SUITE.erl +++ /dev/null @@ -1,49 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_mod_sup_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("eunit/include/eunit.hrl"). - -all() -> emqx_ct:all(?MODULE). - -%%-------------------------------------------------------------------- -%% Test cases -%%-------------------------------------------------------------------- - -t_start(_) -> - ?assertEqual([], supervisor:which_children(emqx_mod_sup)). - -t_start_child(_) -> - %% Set the emqx_mod_sup child with emqx_hooks for test - Mod = emqx_hooks, - Spec = #{id => Mod, - start => {Mod, start_link, []}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [Mod]}, - - ok = emqx_mod_sup:start_child(Mod, worker), - ?assertError({already_started, _}, emqx_mod_sup:start_child(Spec)), - - ok = emqx_mod_sup:stop_child(Mod), - {error, not_found} = emqx_mod_sup:stop_child(Mod), - ok. - diff --git a/apps/emqx_modules/test/emqx_modules_SUITE.erl b/apps/emqx_modules/test/emqx_modules_SUITE.erl index 0ce8b0c5f..e8e0b3218 100644 --- a/apps/emqx_modules/test/emqx_modules_SUITE.erl +++ b/apps/emqx_modules/test/emqx_modules_SUITE.erl @@ -37,7 +37,6 @@ init_per_suite(Config) -> Config. set_special_cfg(_) -> - application:set_env(emqx, modules_loaded_file, emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_modules")), ok. end_per_suite(_Config) -> @@ -47,70 +46,67 @@ end_per_suite(_Config) -> t_load(_) -> ?assertEqual(ok, emqx_modules:unload()), ?assertEqual(ok, emqx_modules:load()), - ?assertEqual({error, not_found}, emqx_modules:load(not_existed_module)), - ?assertEqual({error, not_started}, emqx_modules:unload(emqx_mod_rewrite)), - ?assertEqual(ignore, emqx_modules:reload(emqx_mod_rewrite)). + ?assertEqual({error, not_started}, emqx_modules:unload(rewrite)), + ?assertEqual(ignore, emqx_modules:reload(rewrite)). t_list(_) -> - ?assertMatch([{_, _} | _ ], emqx_modules:list()). + emqx_modules:load(presence, #{qos => 1}), + ?assertMatch([_ | _ ], emqx_modules:list()), + emqx_modules:unload(presence). t_modules_api(_) -> - emqx_modules:load_module(emqx_mod_presence, false), + emqx_modules:load(presence, #{qos => 1}), timer:sleep(50), {ok, Modules1} = request_api(get, api_path(["modules"]), auth_header_()), [Modules11] = filter(get(<<"data">>, Modules1), <<"node">>, atom_to_binary(node(), utf8)), - [Module1] = filter(maps:get(<<"modules">>, Modules11), <<"name">>, <<"emqx_mod_presence">>), - ?assertEqual(<<"emqx_mod_presence">>, maps:get(<<"name">>, Module1)), - ?assertEqual(true, maps:get(<<"active">>, Module1)), - + [Module1] = filter(maps:get(<<"modules">>, Modules11), <<"name">>, <<"presence">>), + ?assertEqual(<<"presence">>, maps:get(<<"name">>, Module1)), {ok, _} = request_api(put, api_path(["modules", - atom_to_list(emqx_mod_presence), + atom_to_list(presence), "unload"]), auth_header_()), {ok, Error1} = request_api(put, api_path(["modules", - atom_to_list(emqx_mod_presence), + atom_to_list(presence), "unload"]), auth_header_()), ?assertEqual(<<"not_started">>, get(<<"message">>, Error1)), {ok, Modules2} = request_api(get, api_path(["nodes", atom_to_list(node()), "modules"]), auth_header_()), - [Module2] = filter(get(<<"data">>, Modules2), <<"name">>, <<"emqx_mod_presence">>), - ?assertEqual(<<"emqx_mod_presence">>, maps:get(<<"name">>, Module2)), - ?assertEqual(false, maps:get(<<"active">>, Module2)), + [Module2] = filter(get(<<"data">>, Modules2), <<"name">>, <<"presence">>), + ?assertEqual(<<"presence">>, maps:get(<<"name">>, Module2)), {ok, _} = request_api(put, api_path(["nodes", atom_to_list(node()), "modules", - atom_to_list(emqx_mod_presence), + atom_to_list(presence), "load"]), auth_header_()), {ok, Modules3} = request_api(get, api_path(["nodes", atom_to_list(node()), "modules"]), auth_header_()), - [Module3] = filter(get(<<"data">>, Modules3), <<"name">>, <<"emqx_mod_presence">>), - ?assertEqual(<<"emqx_mod_presence">>, maps:get(<<"name">>, Module3)), - ?assertEqual(true, maps:get(<<"active">>, Module3)), + [Module3] = filter(get(<<"data">>, Modules3), <<"name">>, <<"presence">>), + ?assertEqual(<<"presence">>, maps:get(<<"name">>, Module3)), {ok, _} = request_api(put, api_path(["nodes", atom_to_list(node()), "modules", - atom_to_list(emqx_mod_presence), + atom_to_list(presence), "unload"]), auth_header_()), {ok, Error2} = request_api(put, api_path(["nodes", atom_to_list(node()), "modules", - atom_to_list(emqx_mod_presence), + atom_to_list(presence), "unload"]), auth_header_()), ?assertEqual(<<"not_started">>, get(<<"message">>, Error2)), - emqx_modules:unload(emqx_mod_presence). + emqx_modules:unload(presence). t_modules_cmd(_) -> @@ -120,10 +116,10 @@ t_modules_cmd(_) -> meck:expect(emqx_modules, unload, fun(_) -> ok end), meck:expect(emqx_modules, reload, fun(_) -> ok end), ?assertEqual(emqx_modules:cli(["list"]), ok), - ?assertEqual(emqx_modules:cli(["load", "emqx_mod_presence"]), - "Module emqx_mod_presence loaded successfully.\n"), - ?assertEqual(emqx_modules:cli(["unload", "emqx_mod_presence"]), - "Module emqx_mod_presence unloaded successfully.\n"), + ?assertEqual(emqx_modules:cli(["load", "delayed"]), + "Module delayed loaded successfully.\n"), + ?assertEqual(emqx_modules:cli(["unload", "delayed"]), + "Module delayed unloaded successfully.\n"), unmock_print(). %% For: https://github.com/emqx/emqx/issues/4511 diff --git a/apps/emqx_telemetry/src/emqx_telemetry.erl b/apps/emqx_telemetry/src/emqx_telemetry.erl index dd6e7aa4c..9bf616b0a 100644 --- a/apps/emqx_telemetry/src/emqx_telemetry.erl +++ b/apps/emqx_telemetry/src/emqx_telemetry.erl @@ -307,12 +307,7 @@ active_plugins() -> end, [], emqx_plugins:list()). active_modules() -> - lists:foldl(fun({Name, Persistent}, Acc) -> - case Persistent of - true -> [Name | Acc]; - false -> Acc - end - end, [], emqx_modules:list()). + emqx_modules:list(). num_clients() -> emqx_stats:getstat('connections.max'). diff --git a/data/loaded_modules.tmpl b/data/loaded_modules.tmpl deleted file mode 100644 index 8dfe6453e..000000000 --- a/data/loaded_modules.tmpl +++ /dev/null @@ -1,2 +0,0 @@ -{emqx_mod_presence, true}. -{emqx_mod_recon, true}. diff --git a/data/loaded_plugins.tmpl b/data/loaded_plugins.tmpl index 80a0c832c..dc23386dc 100644 --- a/data/loaded_plugins.tmpl +++ b/data/loaded_plugins.tmpl @@ -1,4 +1,3 @@ {emqx_management, true}. {emqx_dashboard, true}. -{emqx_modules, {{enable_plugin_emqx_modules}}}. {emqx_retainer, {{enable_plugin_emqx_retainer}}}. diff --git a/rebar.config.erl b/rebar.config.erl index 5d5d02d05..13bbb1c15 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -192,8 +192,8 @@ overlay_vars_rel(RelType) -> cloud -> "vm.args"; edge -> "vm.args.edge" end, - [ {enable_plugin_emqx_modules, false} %% modules is not a plugin in ce - , {enable_plugin_emqx_retainer, true} + [ + {enable_plugin_emqx_retainer, true} , {vm_args_file, VmArgs} ]. @@ -256,9 +256,9 @@ relx_apps(ReleaseType) -> , emqx_data_bridge , emqx_rule_engine , emqx_bridge_mqtt + , emqx_modules ] ++ [emqx_telemetry || not is_enterprise()] - ++ [emqx_modules || not is_enterprise()] ++ [emqx_license || is_enterprise()] ++ [bcrypt || provide_bcrypt_release(ReleaseType)] ++ relx_apps_per_rel(ReleaseType) @@ -318,7 +318,6 @@ relx_overlay(ReleaseType) -> , {mkdir, "data/patches"} , {mkdir, "data/scripts"} , {template, "data/loaded_plugins.tmpl", "data/loaded_plugins"} - , {template, "data/loaded_modules.tmpl", "data/loaded_modules"} , {template, "data/emqx_vars", "releases/emqx_vars"} , {template, "data/BUILT_ON", "releases/{{release_version}}/BUILT_ON"} , {copy, "bin/emqx", "bin/emqx"} @@ -376,6 +375,7 @@ emqx_etc_overlay_common() -> {"{{base_dir}}/lib/emqx_authz/etc/emqx_authz.conf", "etc/plugins/authz.conf"}, {"{{base_dir}}/lib/emqx_rule_engine/etc/emqx_rule_engine.conf", "etc/plugins/emqx_rule_engine.conf"}, {"{{base_dir}}/lib/emqx_bridge_mqtt/etc/emqx_bridge_mqtt.conf", "etc/plugins/emqx_bridge_mqtt.conf"}, + {"{{base_dir}}/lib/emqx_modules/etc/emqx_modules.conf", "etc/plugins/emqx_modules.conf"}, %% TODO: check why it has to end with .paho %% and why it is put to etc/plugins dir {"{{base_dir}}/lib/emqx/etc/acl.conf.paho", "etc/plugins/acl.conf.paho"}].