diff --git a/apps/emqx_connector/data/app.2021.06.01.22.32.03.config b/apps/emqx_connector/data/app.2021.06.01.22.32.03.config new file mode 100644 index 000000000..29e2a1f7a --- /dev/null +++ b/apps/emqx_connector/data/app.2021.06.01.22.32.03.config @@ -0,0 +1,4 @@ +[{connectors,[#{<<"id">> => <<"mysql-abc">>, + <<"type">> => <<"mysql_connenctor">>}, + #{<<"id">> => <<"pgsql-123">>, + <<"type">> => <<"pgsql_connenctor">>}]}]. diff --git a/apps/emqx_connector/data/vm.2021.06.01.22.32.03.args b/apps/emqx_connector/data/vm.2021.06.01.22.32.03.args new file mode 100644 index 000000000..e69de29bb diff --git a/apps/emqx_connector/etc/emqx_connector.conf b/apps/emqx_connector/etc/emqx_connector.conf index d93db8c34..06f25cde4 100644 --- a/apps/emqx_connector/etc/emqx_connector.conf +++ b/apps/emqx_connector/etc/emqx_connector.conf @@ -2,18 +2,18 @@ ## EMQ X CONNECTOR Plugin ##-------------------------------------------------------------------- -## Base directory for emqx_connector indicating where to load configs from disk. -## -## Value: String -## Default: "{{ etc_dir }}/connectors/" -emqx_connectors: [ - {id: "mysql-abc", - type: mysql_connenctor, - config: {} - }, - {id: "pgsql-123", - type: pgsql_connenctor, - config: {} - }, +connectors: [ + {id: "mysql-abc" + resource_type: emqx_connector_mysql + config: { + server: "127.0.0.1:3306" + database: mqtt + pool_size: 1 + user: root + password: public + auto_reconnect: true + ssl: false + } + } ] diff --git a/apps/emqx_connector/priv/emqx_connector.schema b/apps/emqx_connector/priv/emqx_connector.schema index b8476c4d9..cdc079973 100644 --- a/apps/emqx_connector/priv/emqx_connector.schema +++ b/apps/emqx_connector/priv/emqx_connector.schema @@ -1,2 +1,7 @@ %%-*- mode: erlang -*- %% emqx_connector config mapping + +{mapping, "connectors", "connectors", [ + {default, []}, + {datatype, string} +]}. \ No newline at end of file diff --git a/apps/emqx_connector/src/emqx_connector_app.erl b/apps/emqx_connector/src/emqx_connector_app.erl index 0e64d11bc..f554cc117 100644 --- a/apps/emqx_connector/src/emqx_connector_app.erl +++ b/apps/emqx_connector/src/emqx_connector_app.erl @@ -22,7 +22,7 @@ stop(_State) -> load_config() -> case hocon:load("etc/plugins/emqx_connector.conf", #{format => map}) of - {ok, #{<<"emqx_connectors">> := Connectors}} -> + {ok, #{<<"connectors">> := Connectors}} -> lists:foreach(fun load_connector/1, Connectors); {error, Reason} -> error(Reason) diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index ad9b9a4d1..c34d361ce 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -5,7 +5,9 @@ -emqx_resource_api_path("connectors/mysql"). --export([fields/1]). +-export([ fields/1 + , on_config_to_file/1 + ]). %% callbacks of behaviour emqx_resource -export([ on_start/2 @@ -18,10 +20,14 @@ -export([do_health_check/1]). +%%===================================================================== fields("config") -> emqx_connector_schema_lib:relational_db_fields() ++ emqx_connector_schema_lib:ssl_fields(). +on_config_to_file(#{server := Server} = Config) -> + Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}. + %% =================================================================== on_start(InstId, #{server := {Host, Port}, @@ -58,7 +64,7 @@ on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) -> case Result = ecpool:pick_and_do(PoolName, {mysql, query, [SQL]}, no_handover) of {error, Reason} -> logger:debug("mysql connector ~p do sql query failed, sql: ~p, reason: ~p", [InstId, SQL, Reason]), - emqx_resource:query_failure(AfterQuery); + emqx_resource:query_failed(AfterQuery); _ -> emqx_resource:query_success(AfterQuery) end, diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl index 8e69245b9..341901750 100644 --- a/apps/emqx_connector/src/emqx_connector_schema_lib.erl +++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl @@ -6,6 +6,7 @@ ]). -export([ to_ip_port/1 + , ip_port_to_string/1 ]). -typerefl_from_string({ip_port/0, emqx_connector_schema_lib, to_ip_port}). @@ -99,4 +100,7 @@ to_ip_port(Str) -> _ -> {error, Str} end; _ -> {error, Str} - end. \ No newline at end of file + end. + +ip_port_to_string({Ip, Port}) -> + inet:ntoa(Ip) ++ ":" ++ integer_to_list(Port). diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 1f75b453e..123854bc9 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -15,7 +15,7 @@ %%-------------------------------------------------------------------- -type resource_type() :: module(). -type instance_id() :: binary(). --type resource_config() :: jsx:json_term(). +-type resource_config() :: term(). -type resource_spec() :: map(). -type resource_state() :: term(). -type resource_data() :: #{ diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index af9f48cc6..13330b061 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -7,7 +7,8 @@ [kernel, stdlib, gproc, - hocon + hocon, + jsx ]}, {env,[]}, {modules, []}, diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 00ea1f868..b5c1ce730 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -65,6 +65,7 @@ , call_health_check/3 %% verify if the resource is working normally , call_stop/3 %% stop the instance , call_config_merge/4 %% merge the config when updating + , call_config_to_file/2 ]). -export([ list_instances/0 %% list all the instances, id only. @@ -85,12 +86,15 @@ , on_health_check/2 , on_api_reply_format/1 , on_config_merge/3 + , on_config_to_file/1 ]). -callback on_api_reply_format(resource_data()) -> map(). -callback on_config_merge(resource_config(), resource_config(), term()) -> resource_config(). +-callback on_config_to_file(resource_config()) -> jsx:json_term(). + %% when calling emqx_resource:start/1 -callback on_start(instance_id(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. @@ -241,6 +245,10 @@ call_stop(InstId, Mod, ResourceState) -> call_config_merge(Mod, OldConfig, NewConfig, Params) -> ?SAFE_CALL(Mod:on_config_merge(OldConfig, NewConfig, Params)). +-spec call_config_to_file(module(), resource_config()) -> jsx:json_term(). +call_config_to_file(Mod, Config) -> + ?SAFE_CALL(Mod:on_config_to_file(Config)). + -spec parse_config(resource_type(), binary() | term()) -> {ok, resource_config()} | {error, term()}. parse_config(ResourceType, RawConfig) when is_binary(RawConfig) -> @@ -271,7 +279,7 @@ resource_type_from_str(ResourceType) -> false -> {error, {invalid_resource, Mod}} end catch error:badarg -> - {error, {resourec_not_found, ResourceType}} + {error, {resource_not_found, ResourceType}} end. call_instance(InstId, Query) -> diff --git a/apps/emqx_resource/src/emqx_resource_api.erl b/apps/emqx_resource/src/emqx_resource_api.erl index cc32d8a11..61d162ee6 100644 --- a/apps/emqx_resource/src/emqx_resource_api.erl +++ b/apps/emqx_resource/src/emqx_resource_api.erl @@ -34,7 +34,7 @@ get(Mod, #{id := Id}, _Params) -> put(Mod, #{id := Id}, Params) -> ConfigParams = proplists:get_value(<<"config">>, Params), - ResourceTypeStr = proplists:get_value(<<"resource_type">>, Params), + ResourceTypeStr = proplists:get_value(<<"resource_type">>, Params, #{}), case emqx_resource:resource_type_from_str(ResourceTypeStr) of {ok, ResourceType} -> do_put(Mod, stringnify(Id), ConfigParams, ResourceType, Params); diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 3a750ebf3..ff7158c9c 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -110,8 +110,8 @@ load_config(RawConfig) when is_binary(RawConfig) -> Error -> Error end; -load_config(#{<<"id">> := Id, <<"resource_type">> := ResourceTypeStr, - <<"config">> := MapConfig}) -> +load_config(#{<<"id">> := Id, <<"resource_type">> := ResourceTypeStr} = Config) -> + MapConfig = maps:get(<<"config">>, Config, #{}), case emqx_resource:resource_type_from_str(ResourceTypeStr) of {ok, ResourceType} -> parse_and_load_config(Id, ResourceType, MapConfig); Error -> Error @@ -130,8 +130,11 @@ create_local(InstId, ResourceType, InstConf) -> end. save_config_to_disk(InstId, ResourceType, Config) -> + %% TODO: send an event to the config handler, and the hander (single process) + %% will dump configs for all instances (from an ETS table) to a file. file:write_file(filename:join([emqx_data_dir(), binary_to_list(InstId) ++ ".conf"]), - jsx:encode(#{id => InstId, resource_type => ResourceType, config => Config})). + jsx:encode(#{id => InstId, resource_type => ResourceType, + config => emqx_resource:call_config_to_file(Config)})). emqx_data_dir() -> "data".