From 54aeacee1444108c061e16b9d6fdb2a5d0bcf58f Mon Sep 17 00:00:00 2001 From: Turtle Date: Mon, 28 Jun 2021 11:30:38 +0800 Subject: [PATCH] feat(rule-engine): update the configuration file to hocon --- .../etc/emqx_rule_engine.conf | 44 ++----------- .../priv/emqx_rule_engine.schema | 61 ------------------- .../src/emqx_rule_engine.app.src | 2 +- .../src/emqx_rule_engine.appup.src | 38 ------------ .../src/emqx_rule_engine_schema.erl | 29 +++++++++ .../emqx_rule_engine/src/emqx_rule_events.erl | 45 ++++---------- .../test/emqx_rule_engine_SUITE.erl | 19 +----- .../src/emqx_telemetry_schema.erl | 18 +++++- data/loaded_plugins.tmpl | 1 - rebar.config.erl | 5 +- 10 files changed, 66 insertions(+), 196 deletions(-) delete mode 100644 apps/emqx_rule_engine/priv/emqx_rule_engine.schema delete mode 100644 apps/emqx_rule_engine/src/emqx_rule_engine.appup.src create mode 100644 apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl diff --git a/apps/emqx_rule_engine/etc/emqx_rule_engine.conf b/apps/emqx_rule_engine/etc/emqx_rule_engine.conf index 556c59970..c1637d66d 100644 --- a/apps/emqx_rule_engine/etc/emqx_rule_engine.conf +++ b/apps/emqx_rule_engine/etc/emqx_rule_engine.conf @@ -1,42 +1,6 @@ ##==================================================================== -## Rule Engine for EMQ X R4.0 +## Rule Engine for EMQ X R5.0 ##==================================================================== - -rule_engine.ignore_sys_message = on - -## Event Messages -## -## If enabled (on), rule engine publishes the event as an MQTT message -## with topic='$events/' on the occurrence of an emqx event. -## -## If disabled, rule engine stops publishing the event messages, but -## the event message can still be processed by the rule SQL. e.g. rule SQL: -## -## SELECT * FROM "$events/client_connected" -## -## will still work even if 'rule_engine.events.client_connected' is set to 'off' -## -## EMQ Event to event message mapping: -## -## - client.connected -> $events/client_connected -## - client.disconnected -> $events/client_disconnected -## - session.subscribed -> $events/session_subscribed -## - session.unsubscribed -> $events/session_unsubscribed -## - message.delivered -> $events/message_delivered -## - message.acked -> $events/message_acked -## - message.dropped -> $events/message_dropped -## -## Config Value Format: Toggle, QoS-Level -## -## Toggle: on/off -## -## QoS-Level: qos0/qos1/qos2 - -#rule_engine.events.client_connected = "on, qos1" -rule_engine.events.client_connected = off -rule_engine.events.client_disconnected = off -rule_engine.events.session_subscribed = off -rule_engine.events.session_unsubscribed = off -rule_engine.events.message_delivered = off -rule_engine.events.message_acked = off -rule_engine.events.message_dropped = off +emqx_rule_engine:{ + ignore_sys_message: true +} diff --git a/apps/emqx_rule_engine/priv/emqx_rule_engine.schema b/apps/emqx_rule_engine/priv/emqx_rule_engine.schema deleted file mode 100644 index c5549aa36..000000000 --- a/apps/emqx_rule_engine/priv/emqx_rule_engine.schema +++ /dev/null @@ -1,61 +0,0 @@ -%%-*- mode: erlang -*- -%% emqx_rule_engine config mapping - -{mapping, "rule_engine.ignore_sys_message", "emqx_rule_engine.ignore_sys_message", [ - {default, on}, - {datatype, flag} -]}. - -{mapping, "rule_engine.events.$name", "emqx_rule_engine.events", [ - {default, "off, qos1"}, - {datatype, string} -]}. - -{translation, "emqx_rule_engine.events", fun(Conf) -> - SupportedHooks = - [ 'client.connected' - , 'client.disconnected' - , 'session.subscribed' - , 'session.unsubscribed' - , 'message.delivered' - , 'message.acked' - , 'message.dropped' - ], - - HookPoint = fun(Event) -> - case string:split(Event, "_") of - [Prefix, Name] -> - Point = list_to_atom(lists:append([Prefix, ".", Name])), - case lists:member(Point, SupportedHooks) of - true -> Point; - false -> error({unsupported_event, Event}) - end; - [_] -> - error({invalid_event, Event}) - end - end, - - QoS = fun ("qos"++Level = QoSLevel) -> - case list_to_integer(Level) of - QoSL when QoSL =:= 0; QoSL =:= 1; QoSL =:= 2 -> - QoSL; - _ -> - error({invalid_qos_level, QoSLevel}) - end; - (QoSLevel) -> - error({invalid_qos, QoSLevel}) - end, - - lists:foldl( - fun({EE=[_,"events",EvtName], Val}, Acc) -> - case string:split(string:trim(Val), ",", all) of - ["on"++_, Snd] -> - [{HookPoint(EvtName), on, QoS(string:trim(Snd))} | Acc]; - ["on"++_] -> - [{HookPoint(EvtName), on, 1} | Acc]; - [_] -> - Acc - end; - ({_, _}, Acc) -> Acc - end, [], cuttlefish_variable:filter_by_prefix("rule_engine.events", Conf)) -end}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index aebb73150..ff25dcfd3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -1,6 +1,6 @@ {application, emqx_rule_engine, [{description, "EMQ X Rule Engine"}, - {vsn, "4.3.3"}, % strict semver, bump manually! + {vsn, "5.0.0"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_registry]}, {applications, [kernel,stdlib,rulesql,getopt]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src deleted file mode 100644 index 01c07c124..000000000 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ /dev/null @@ -1,38 +0,0 @@ -%% -*-: erlang -*- -{"4.3.3", - [ {"4.3.0", - [ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []} - , {load_module, emqx_rule_engine, brutal_purge, soft_purge, []} - , {load_module, emqx_rule_registry, brutal_purge, soft_purge, []} - , {apply, {emqx_stats, cancel_update, [rule_registery_stats]}} - ]}, - {"4.3.1", - [ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []} - , {load_module, emqx_rule_registry, brutal_purge, soft_purge, []} - , {apply, {emqx_stats, cancel_update, [rule_registery_stats]}} - ]}, - {"4.3.2", - [ {load_module, emqx_rule_registry, brutal_purge, soft_purge, []} - , {apply, {emqx_stats, cancel_update, [rule_registery_stats]}} - ]}, - {<<".*">>, []} - ], - [ - {"4.3.0", - [ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []} - , {load_module, emqx_rule_engine, brutal_purge, soft_purge, []} - , {load_module, emqx_rule_registry, brutal_purge, soft_purge, []} - , {apply, {emqx_stats, cancel_update, [rule_registery_stats]}} - ]}, - {"4.3.1", - [ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []} - , {load_module, emqx_rule_registry, brutal_purge, soft_purge, []} - , {apply, {emqx_stats, cancel_update, [rule_registery_stats]}} - ]}, - {"4.3.2", - [ {load_module, emqx_rule_registry, brutal_purge, soft_purge, []} - , {apply, {emqx_stats, cancel_update, [rule_registery_stats]}} - ]}, - {<<".*">>, []} - ] -}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl new file mode 100644 index 000000000..f7658c208 --- /dev/null +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl @@ -0,0 +1,29 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_rule_engine_schema). + +-include_lib("typerefl/include/types.hrl"). + +-behaviour(hocon_schema). + +-export([ structs/0 + , fields/1]). + +structs() -> ["emqx_rule_engine"]. + +fields("emqx_rule_engine") -> + [{ignore_sys_message, emqx_schema:t(boolean(), undefined, true)}]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 97e40439d..26689b022 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -63,9 +63,10 @@ -endif. load(Topic) -> + IgnoreSys = proplists:get_value(ignore_sys_message, env(), true), HookPoint = event_name(Topic), emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), - [hook_conf(HookPoint, env())]}). + [#{ignore_sys_message => IgnoreSys}]}). unload() -> lists:foreach(fun(HookPoint) -> @@ -97,26 +98,26 @@ on_message_publish(Message = #message{topic = Topic}, _Env) -> {ok, Message}. on_client_connected(ClientInfo, ConnInfo, Env) -> - may_publish_and_apply('client.connected', + apply_event('client.connected', fun() -> eventmsg_connected(ClientInfo, ConnInfo) end, Env). on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) -> - may_publish_and_apply('client.disconnected', + apply_event('client.disconnected', fun() -> eventmsg_disconnected(ClientInfo, ConnInfo, Reason) end, Env). on_session_subscribed(ClientInfo, Topic, SubOpts, Env) -> - may_publish_and_apply('session.subscribed', + apply_event('session.subscribed', fun() -> eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts) end, Env). on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env) -> - may_publish_and_apply('session.unsubscribed', + apply_event('session.unsubscribed', fun() -> eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts) end, Env). on_message_dropped(Message = #message{flags = #{sys := true}}, _, _, #{ignore_sys_message := true}) -> {ok, Message}; on_message_dropped(Message, _, Reason, Env) -> - may_publish_and_apply('message.dropped', + apply_event('message.dropped', fun() -> eventmsg_dropped(Message, Reason) end, Env), {ok, Message}. @@ -124,7 +125,7 @@ on_message_delivered(_ClientInfo, Message = #message{flags = #{sys := true}}, #{ignore_sys_message := true}) -> {ok, Message}; on_message_delivered(ClientInfo, Message, Env) -> - may_publish_and_apply('message.delivered', + apply_event('message.delivered', fun() -> eventmsg_delivered(ClientInfo, Message) end, Env), {ok, Message}. @@ -132,7 +133,7 @@ on_message_acked(_ClientInfo, Message = #message{flags = #{sys := true}}, #{ignore_sys_message := true}) -> {ok, Message}; on_message_acked(ClientInfo, Message, Env) -> - may_publish_and_apply('message.acked', + apply_event('message.acked', fun() -> eventmsg_acked(ClientInfo, Message) end, Env), {ok, Message}. @@ -297,31 +298,15 @@ with_basic_columns(EventName, Data) when is_map(Data) -> }. %%-------------------------------------------------------------------- -%% Events publishing and rules applying +%% rules applying %%-------------------------------------------------------------------- - -may_publish_and_apply(EventName, GenEventMsg, #{enabled := true, qos := QoS}) -> - EventTopic = event_topic(EventName), - EventMsg = GenEventMsg(), - case emqx_json:safe_encode(EventMsg) of - {ok, Payload} -> - _ = emqx_broker:safe_publish(make_msg(QoS, EventTopic, Payload)), - ok; - {error, _Reason} -> - ?LOG(error, "Failed to encode event msg for ~p, msg: ~p", [EventName, EventMsg]) - end, - emqx_rule_runtime:apply_rules(emqx_rule_registry:get_rules_for(EventTopic), EventMsg); -may_publish_and_apply(EventName, GenEventMsg, _Env) -> +apply_event(EventName, GenEventMsg, _Env) -> EventTopic = event_topic(EventName), case emqx_rule_registry:get_rules_for(EventTopic) of [] -> ok; Rules -> emqx_rule_runtime:apply_rules(Rules, GenEventMsg()) end. -make_msg(QoS, Topic, Payload) -> - emqx_message:set_flags(#{sys => true, event => true}, - emqx_message:make(emqx_events, QoS, Topic, iolist_to_binary(Payload))). - %%-------------------------------------------------------------------- %% Columns %%-------------------------------------------------------------------- @@ -559,14 +544,6 @@ columns_with_exam('session.unsubscribed') -> %% Helper functions %%-------------------------------------------------------------------- -hook_conf(HookPoint, Env) -> - Events = proplists:get_value(events, Env, []), - IgnoreSys = proplists:get_value(ignore_sys_message, Env, true), - case lists:keyfind(HookPoint, 1, Events) of - {_, on, QoS} -> #{enabled => true, qos => QoS, ignore_sys_message => IgnoreSys}; - _ -> #{enabled => false, qos => 1, ignore_sys_message => IgnoreSys} - end. - hook_fun(Event) -> case string:split(atom_to_list(Event), ".") of [Prefix, Name] -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index d8244c018..da3e963f0 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -149,11 +149,11 @@ groups() -> init_per_suite(Config) -> ok = ekka_mnesia:start(), ok = emqx_rule_registry:mnesia(boot), - start_apps(), + ok = emqx_ct_helpers:start_apps([emqx_rule_engine], fun set_special_configs/1), Config. end_per_suite(_Config) -> - stop_apps(), + emqx_ct_helpers:stop_apps([emqx_rule_engine]), ok. on_resource_create(_id, _) -> #{}. @@ -2545,21 +2545,6 @@ init_events_counters() -> %%------------------------------------------------------------------------------ %% Start Apps %%------------------------------------------------------------------------------ - -stop_apps() -> - stopped = mnesia:stop(), - [application:stop(App) || App <- [emqx_rule_engine, emqx]]. - -start_apps() -> - [start_apps(App, SchemaFile, ConfigFile) || - {App, SchemaFile, ConfigFile} - <- [{emqx, emqx_schema, deps_path(emqx, "etc/emqx.conf")}, - {emqx_rule_engine, local_path("priv/emqx_rule_engine.schema"), - local_path("etc/emqx_rule_engine.conf")}]]. - -start_apps(App, Schema, ConfigFile) -> - emqx_ct_helpers:start_app(App, Schema, ConfigFile, fun set_special_configs/1). - deps_path(App, RelativePath) -> Path0 = code:lib_dir(App), Path = case file:read_link(Path0) of diff --git a/apps/emqx_telemetry/src/emqx_telemetry_schema.erl b/apps/emqx_telemetry/src/emqx_telemetry_schema.erl index 4d5cab684..0addd4726 100644 --- a/apps/emqx_telemetry/src/emqx_telemetry_schema.erl +++ b/apps/emqx_telemetry/src/emqx_telemetry_schema.erl @@ -1,3 +1,19 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + -module(emqx_telemetry_schema). -include_lib("typerefl/include/types.hrl"). @@ -10,4 +26,4 @@ structs() -> ["emqx_telemetry"]. fields("emqx_telemetry") -> - [{enabled, emqx_schema:t(boolean(), undefined, false)}]. \ No newline at end of file + [{enabled, emqx_schema:t(boolean(), undefined, false)}]. diff --git a/data/loaded_plugins.tmpl b/data/loaded_plugins.tmpl index c2b6311f2..d26d56abf 100644 --- a/data/loaded_plugins.tmpl +++ b/data/loaded_plugins.tmpl @@ -2,5 +2,4 @@ {emqx_dashboard, true}. {emqx_modules, {{enable_plugin_emqx_modules}}}. {emqx_retainer, {{enable_plugin_emqx_retainer}}}. -{emqx_rule_engine, {{enable_plugin_emqx_rule_engine}}}. {emqx_bridge_mqtt, {{enable_plugin_emqx_bridge_mqtt}}}. diff --git a/rebar.config.erl b/rebar.config.erl index d0822f3b2..5f00f87b2 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -192,8 +192,7 @@ overlay_vars_rel(RelType) -> cloud -> "vm.args"; edge -> "vm.args.edge" end, - [ {enable_plugin_emqx_rule_engine, RelType =:= cloud} - , {enable_plugin_emqx_bridge_mqtt, RelType =:= edge} + [ {enable_plugin_emqx_bridge_mqtt, RelType =:= edge} , {enable_plugin_emqx_modules, false} %% modules is not a plugin in ce , {enable_plugin_emqx_retainer, true} , {vm_args_file, VmArgs} @@ -254,6 +253,7 @@ relx_apps(ReleaseType) -> , emqx_resource , emqx_connector , emqx_data_bridge + , emqx_rule_engine ] ++ [emqx_telemetry || not is_enterprise()] ++ [emqx_modules || not is_enterprise()] @@ -286,7 +286,6 @@ relx_plugin_apps(ReleaseType) -> , emqx_stomp , emqx_authentication , emqx_web_hook - , emqx_rule_engine , emqx_statsd ] ++ relx_plugin_apps_per_rel(ReleaseType)