diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl
index 12fda5d51..36ac10716 100644
--- a/apps/emqx_bridge/src/emqx_action_info.erl
+++ b/apps/emqx_bridge/src/emqx_action_info.erl
@@ -110,6 +110,7 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_es_action_info,
emqx_bridge_opents_action_info,
emqx_bridge_rabbitmq_action_info,
+ emqx_bridge_pulsar_action_info,
emqx_bridge_greptimedb_action_info,
emqx_bridge_tdengine_action_info,
emqx_bridge_s3_action_info
diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl
index 35b964b83..69b17a843 100644
--- a/apps/emqx_bridge/src/emqx_bridge_api.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_api.erl
@@ -761,7 +761,7 @@ is_bridge_enabled(BridgeType, BridgeName) ->
end.
is_bridge_enabled_v1(BridgeType, BridgeName) ->
- %% we read from the transalted config because the defaults are populated here.
+ %% we read from the translated config because the defaults are populated here.
try emqx:get_config([bridges, BridgeType, binary_to_existing_atom(BridgeName)]) of
ConfMap ->
maps:get(enable, ConfMap, false)
diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl
index 56fe0029a..be20b9a7b 100644
--- a/apps/emqx_bridge/src/emqx_bridge_v2.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl
@@ -1659,8 +1659,11 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
connector_conf := ConnectorRawConf,
bridge_v2_type := BridgeV2Type,
bridge_v2_name := _BridgeName,
- bridge_v2_conf := BridgeV2RawConf
+ bridge_v2_conf := BridgeV2RawConf0
} = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf),
+ BridgeV2RawConf = emqx_action_info:action_convert_from_connector(
+ BridgeType, ConnectorRawConf, BridgeV2RawConf0
+ ),
create_dry_run_helper(
ensure_atom_root_key(ConfRootKey), BridgeV2Type, ConnectorRawConf, BridgeV2RawConf
)
@@ -1928,7 +1931,8 @@ convert_from_connectors(ConfRootKey, Conf) ->
convert_from_connector(ConfRootKey, Type, Name, Action = #{<<"connector">> := ConnectorName}) ->
case get_connector_info(ConnectorName, Type) of
{ok, Connector} ->
- Action1 = emqx_action_info:action_convert_from_connector(Type, Connector, Action),
+ TypeAtom = to_existing_atom(Type),
+ Action1 = emqx_action_info:action_convert_from_connector(TypeAtom, Connector, Action),
{ok, Action1};
{error, not_found} ->
{error, #{
diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl
index faac94dcb..233d87fa1 100644
--- a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl
+++ b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl
@@ -123,7 +123,7 @@ resource_type(dynamo) -> emqx_bridge_dynamo_connector;
resource_type(rocketmq) -> emqx_bridge_rocketmq_connector;
resource_type(sqlserver) -> emqx_bridge_sqlserver_connector;
resource_type(opents) -> emqx_bridge_opents_connector;
-resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer;
+resource_type(pulsar_producer) -> emqx_bridge_pulsar_connector;
resource_type(oracle) -> emqx_oracle;
resource_type(iotdb) -> emqx_bridge_iotdb_connector;
resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector;
diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl
index 6d5ae4f4c..7233e9e6c 100644
--- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl
+++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl
@@ -308,7 +308,7 @@ fields(Field) when
Fields = fields("specific_connector_config"),
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
fields(What) ->
- error({emqx_bridge_mqtt_connector_schema, missing_field_handler, What}).
+ error({?MODULE, missing_field_handler, What}).
ingress_pool_size(desc) ->
?DESC("ingress_pool_size");
diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl
index 2b9bb05bd..60cf634c4 100644
--- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl
+++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl
@@ -124,7 +124,7 @@ fields(Field) when
->
emqx_bridge_v2_schema:api_fields(Field, ?SOURCE_TYPE, fields("mqtt_subscriber_source"));
fields(What) ->
- error({emqx_bridge_mqtt_pubsub_schema, missing_field_handler, What}).
+ error({?MODULE, missing_field_handler, What}).
%% v2: api schema
%% The parameter equls to
%% `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1
diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src
index c9abebf8b..ce7c313ae 100644
--- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src
+++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src
@@ -1,6 +1,6 @@
{application, emqx_bridge_pulsar, [
{description, "EMQX Pulsar Bridge"},
- {vsn, "0.1.8"},
+ {vsn, "0.2.0"},
{registered, []},
{applications, [
kernel,
diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl
index f9f37846e..291c656ef 100644
--- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl
+++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl
@@ -31,7 +31,21 @@ roots() ->
[].
fields(pulsar_producer) ->
- fields(config) ++ fields(producer_opts);
+ fields(config) ++
+ emqx_bridge_pulsar_pubsub_schema:fields(action_parameters) ++
+ fields(producer_opts) ++
+ [
+ {local_topic,
+ mk(binary(), #{required => false, desc => ?DESC("producer_local_topic")})},
+ {resource_opts,
+ mk(
+ ref(producer_resource_opts),
+ #{
+ required => false,
+ desc => ?DESC(emqx_resource_schema, "creation_opts")
+ }
+ )}
+ ];
fields(config) ->
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
@@ -85,10 +99,6 @@ fields(producer_opts) ->
mk(emqx_schema:bytesize(), #{
default => <<"1MB">>, desc => ?DESC("producer_send_buffer")
})},
- {sync_timeout,
- mk(emqx_schema:timeout_duration_ms(), #{
- default => <<"3s">>, desc => ?DESC("producer_sync_timeout")
- })},
{retention_period,
mk(
%% not used in a `receive ... after' block, just timestamp comparison
@@ -100,26 +110,13 @@ fields(producer_opts) ->
emqx_schema:bytesize(),
#{default => <<"900KB">>, desc => ?DESC("producer_max_batch_bytes")}
)},
- {local_topic, mk(binary(), #{required => false, desc => ?DESC("producer_local_topic")})},
{pulsar_topic, mk(binary(), #{required => true, desc => ?DESC("producer_pulsar_topic")})},
{strategy,
mk(
hoconsc:enum([random, roundrobin, key_dispatch]),
#{default => random, desc => ?DESC("producer_strategy")}
)},
- {buffer, mk(ref(producer_buffer), #{required => false, desc => ?DESC("producer_buffer")})},
- {message,
- mk(ref(producer_pulsar_message), #{
- required => false, desc => ?DESC("producer_message_opts")
- })},
- {resource_opts,
- mk(
- ref(producer_resource_opts),
- #{
- required => false,
- desc => ?DESC(emqx_resource_schema, "creation_opts")
- }
- )}
+ {buffer, mk(ref(producer_buffer), #{required => false, desc => ?DESC("producer_buffer")})}
];
fields(producer_buffer) ->
[
@@ -144,12 +141,6 @@ fields(producer_buffer) ->
desc => ?DESC("buffer_memory_overload_protection")
})}
];
-fields(producer_pulsar_message) ->
- [
- {key,
- mk(string(), #{default => <<"${.clientid}">>, desc => ?DESC("producer_key_template")})},
- {value, mk(string(), #{default => <<"${.}">>, desc => ?DESC("producer_value_template")})}
- ];
fields(producer_resource_opts) ->
SupportedOpts = [
health_check_interval,
@@ -225,8 +216,8 @@ producer_strategy_key_validator(
producer_strategy_key_validator(emqx_utils_maps:binary_key_map(Conf));
producer_strategy_key_validator(#{
<<"strategy">> := key_dispatch,
- <<"message">> := #{<<"key">> := ""}
-}) ->
+ <<"message">> := #{<<"key">> := Key}
+}) when Key =:= "" orelse Key =:= <<>> ->
{error, "Message key cannot be empty when `key_dispatch` strategy is used"};
producer_strategy_key_validator(_) ->
ok.
@@ -248,8 +239,7 @@ struct_names() ->
[
auth_basic,
auth_token,
- producer_buffer,
- producer_pulsar_message
+ producer_buffer
].
override_default(OriginalFn, NewDefault) ->
diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl
new file mode 100644
index 000000000..f51ed7884
--- /dev/null
+++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl
@@ -0,0 +1,54 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_pulsar_action_info).
+
+-behaviour(emqx_action_info).
+
+-export([
+ bridge_v1_type_name/0,
+ action_type_name/0,
+ connector_type_name/0,
+ schema_module/0,
+ is_action/1,
+ action_convert_from_connector/2
+]).
+
+is_action(_) -> true.
+
+bridge_v1_type_name() -> pulsar_producer.
+
+action_type_name() -> pulsar.
+
+connector_type_name() -> pulsar.
+
+schema_module() -> emqx_bridge_pulsar_pubsub_schema.
+
+action_convert_from_connector(ConnectorConfig, ActionConfig) ->
+ Dispatch = emqx_utils_conv:bin(maps:get(<<"strategy">>, ConnectorConfig, <<>>)),
+ case Dispatch of
+ <<"key_dispatch">> ->
+ case emqx_utils_maps:deep_find([<<"parameters">>, <<"message">>], ActionConfig) of
+ {ok, Message} ->
+ Validator =
+ #{
+ <<"strategy">> => key_dispatch,
+ <<"message">> => emqx_utils_maps:binary_key_map(Message)
+ },
+ case emqx_bridge_pulsar:producer_strategy_key_validator(Validator) of
+ ok ->
+ ActionConfig;
+ {error, Reason} ->
+ throw(#{
+ reason => Reason,
+ kind => validation_error
+ })
+ end;
+ {not_found, _, _} ->
+ %% no message field, use the default message template
+ ActionConfig
+ end;
+ _ ->
+ ActionConfig
+ end.
diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl
similarity index 81%
rename from apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl
rename to apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl
index 2098cfeba..7b080d0e6 100644
--- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl
+++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl
@@ -1,7 +1,7 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
--module(emqx_bridge_pulsar_impl_producer).
+-module(emqx_bridge_pulsar_connector).
-include("emqx_bridge_pulsar.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
@@ -13,8 +13,12 @@
callback_mode/0,
query_mode/1,
on_start/2,
+ on_add_channel/4,
+ on_remove_channel/3,
+ on_get_channels/1,
on_stop/2,
on_get_status/2,
+ on_get_channel_status/3,
on_query/3,
on_query_async/4
]).
@@ -23,8 +27,7 @@
-type state() :: #{
pulsar_client_id := pulsar_client_id(),
producers := pulsar_producers:producers(),
- sync_timeout := erlang:timeout(),
- message_template := message_template()
+ channels := map()
}.
-type buffer_mode() :: memory | disk | hybrid.
-type compression_mode() :: no_compression | snappy | zlib.
@@ -77,16 +80,12 @@ query_mode(_Config) ->
-spec on_start(resource_id(), config()) -> {ok, state()}.
on_start(InstanceId, Config) ->
- #{
- bridge_name := BridgeName,
- servers := Servers0,
- ssl := SSL
- } = Config,
+ #{servers := Servers0, ssl := SSL} = Config,
Servers = format_servers(Servers0),
- ClientId = make_client_id(InstanceId, BridgeName),
+ ClientId = make_client_id(InstanceId),
ok = emqx_resource:allocate_resource(InstanceId, ?pulsar_client_id, ClientId),
SSLOpts = emqx_tls_lib:to_client_opts(SSL),
- ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)),
+ ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(10)),
ClientOpts = #{
connect_timeout => ConnectTimeout,
ssl_opts => SSLOpts,
@@ -119,6 +118,30 @@ on_start(InstanceId, Config) ->
end,
start_producer(Config, InstanceId, ClientId, ClientOpts).
+on_add_channel(
+ _InstanceId,
+ #{channels := Channels} = State,
+ ChannelId,
+ #{parameters := #{message := Message, sync_timeout := SyncTimeout}}
+) ->
+ case maps:is_key(ChannelId, Channels) of
+ true ->
+ {error, already_exists};
+ false ->
+ Parameters = #{
+ message => compile_message_template(Message),
+ sync_timeout => SyncTimeout
+ },
+ NewChannels = maps:put(ChannelId, Parameters, Channels),
+ {ok, State#{channels => NewChannels}}
+ end.
+
+on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) ->
+ {ok, State#{channels => maps:remove(ChannelId, Channels)}}.
+
+on_get_channels(InstanceId) ->
+ emqx_bridge_v2:get_channels_for_connector(InstanceId).
+
-spec on_stop(resource_id(), state()) -> ok.
on_stop(InstanceId, _State) ->
case emqx_resource:get_allocated_resources(InstanceId) of
@@ -174,76 +197,77 @@ on_get_status(_InstanceId, _State) ->
%% create the bridge is not quite finished, `State = undefined'.
connecting.
--spec on_query(resource_id(), {send_message, map()}, state()) ->
+on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
+ case maps:is_key(ChannelId, Channels) of
+ true -> connected;
+ false -> {error, channel_not_exists}
+ end.
+
+-spec on_query(resource_id(), tuple(), state()) ->
{ok, term()}
| {error, timeout}
| {error, term()}.
-on_query(_InstanceId, {send_message, Message}, State) ->
- #{
- producers := Producers,
- sync_timeout := SyncTimeout,
- message_template := MessageTemplate
- } = State,
- PulsarMessage = render_message(Message, MessageTemplate),
- try
- pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout)
- catch
- error:timeout ->
- {error, timeout}
+on_query(_InstanceId, {ChannelId, Message}, State) ->
+ #{producers := Producers, channels := Channels} = State,
+ case maps:find(ChannelId, Channels) of
+ error ->
+ {error, channel_not_exists};
+ {ok, #{message := MessageTmpl, sync_timeout := SyncTimeout}} ->
+ PulsarMessage = render_message(Message, MessageTmpl),
+ try
+ pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout)
+ catch
+ error:timeout ->
+ {error, timeout}
+ end
end.
-spec on_query_async(
- resource_id(), {send_message, map()}, {ReplyFun :: function(), Args :: list()}, state()
+ resource_id(), tuple(), {ReplyFun :: function(), Args :: list()}, state()
) ->
{ok, pid()}.
-on_query_async(_InstanceId, {send_message, Message}, AsyncReplyFn, State) ->
- ?tp_span(
- pulsar_producer_on_query_async,
- #{instance_id => _InstanceId, message => Message},
- do_on_query_async(Message, AsyncReplyFn, State)
- ).
+on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) ->
+ #{producers := Producers, channels := Channels} = State,
+ case maps:find(ChannelId, Channels) of
+ error ->
+ {error, channel_not_exists};
+ {ok, #{message := MessageTmpl}} ->
+ ?tp_span(
+ pulsar_producer_on_query_async,
+ #{instance_id => _InstanceId, message => Message},
+ on_query_async2(Producers, Message, MessageTmpl, AsyncReplyFn)
+ )
+ end.
-do_on_query_async(Message, AsyncReplyFn, State) ->
- #{
- producers := Producers,
- message_template := MessageTemplate
- } = State,
- PulsarMessage = render_message(Message, MessageTemplate),
+on_query_async2(Producers, Message, MessageTmpl, AsyncReplyFn) ->
+ PulsarMessage = render_message(Message, MessageTmpl),
pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}).
%%-------------------------------------------------------------------------------------
%% Internal fns
%%-------------------------------------------------------------------------------------
--spec to_bin(atom() | string() | binary()) -> binary().
-to_bin(A) when is_atom(A) ->
- atom_to_binary(A);
-to_bin(L) when is_list(L) ->
- list_to_binary(L);
-to_bin(B) when is_binary(B) ->
- B.
-
-spec format_servers(binary()) -> [string()].
format_servers(Servers0) ->
- Servers1 = emqx_schema:parse_servers(Servers0, ?PULSAR_HOST_OPTIONS),
lists:map(
fun(#{scheme := Scheme, hostname := Host, port := Port}) ->
Scheme ++ "://" ++ Host ++ ":" ++ integer_to_list(Port)
end,
- Servers1
+ emqx_schema:parse_servers(Servers0, ?PULSAR_HOST_OPTIONS)
).
--spec make_client_id(resource_id(), atom() | binary()) -> pulsar_client_id().
-make_client_id(InstanceId, BridgeName) ->
+-spec make_client_id(resource_id()) -> pulsar_client_id().
+make_client_id(InstanceId) ->
case is_dry_run(InstanceId) of
true ->
pulsar_producer_probe;
false ->
+ {pulsar, Name} = emqx_connector_resource:parse_connector_id(InstanceId),
ClientIdBin = iolist_to_binary([
- <<"pulsar_producer:">>,
- to_bin(BridgeName),
+ <<"pulsar:">>,
+ emqx_utils_conv:bin(Name),
<<":">>,
- to_bin(node())
+ emqx_utils_conv:bin(node())
]),
binary_to_atom(ClientIdBin)
end.
@@ -252,10 +276,8 @@ make_client_id(InstanceId, BridgeName) ->
is_dry_run(InstanceId) ->
TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX),
case TestIdStart of
- nomatch ->
- false;
- _ ->
- string:equal(TestIdStart, InstanceId)
+ nomatch -> false;
+ _ -> string:equal(TestIdStart, InstanceId)
end.
conn_opts(#{authentication := none}) ->
@@ -275,11 +297,11 @@ conn_opts(#{authentication := #{jwt := JWT}}) ->
-spec replayq_dir(pulsar_client_id()) -> string().
replayq_dir(ClientId) ->
- filename:join([emqx:data_dir(), "pulsar", to_bin(ClientId)]).
+ filename:join([emqx:data_dir(), "pulsar", emqx_utils_conv:bin(ClientId)]).
-spec producer_name(pulsar_client_id()) -> atom().
producer_name(ClientId) ->
- ClientIdBin = to_bin(ClientId),
+ ClientIdBin = emqx_utils_conv:bin(ClientId),
binary_to_atom(
iolist_to_binary([
<<"producer-">>,
@@ -303,12 +325,10 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
},
compression := Compression,
max_batch_bytes := MaxBatchBytes,
- message := MessageTemplateOpts,
pulsar_topic := PulsarTopic0,
retention_period := RetentionPeriod,
send_buffer := SendBuffer,
- strategy := Strategy,
- sync_timeout := SyncTimeout
+ strategy := Strategy
} = Config,
{OffloadMode, ReplayQDir} =
case BufferMode of
@@ -330,7 +350,6 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
},
ProducerName = producer_name(ClientId),
?tp(pulsar_producer_capture_name, #{producer_name => ProducerName}),
- MessageTemplate = compile_message_template(MessageTemplateOpts),
ProducerOpts0 =
#{
batch_size => BatchSize,
@@ -353,8 +372,7 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
State = #{
pulsar_client_id => ClientId,
producers => Producers,
- sync_timeout => SyncTimeout,
- message_template => MessageTemplate
+ channels => #{}
},
?tp(pulsar_producer_bridge_started, #{}),
{ok, State}
diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector_schema.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector_schema.erl
new file mode 100644
index 000000000..953318e0a
--- /dev/null
+++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector_schema.erl
@@ -0,0 +1,71 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_pulsar_connector_schema).
+
+-export([namespace/0, roots/0, fields/1, desc/1]).
+-export([connector_examples/1, connector_example_values/0]).
+
+-include("emqx_bridge_pulsar.hrl").
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+-define(TYPE, pulsar).
+
+namespace() -> ?TYPE.
+
+roots() -> [].
+
+fields("config_connector") ->
+ lists:keydelete(enable, 1, emqx_bridge_schema:common_bridge_fields()) ++
+ emqx_bridge_pulsar:fields(config) ++
+ emqx_bridge_pulsar:fields(producer_opts) ++
+ emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
+fields(connector_resource_opts) ->
+ emqx_connector_schema:resource_opts_fields();
+fields("post") ->
+ emqx_connector_schema:type_and_name_fields(?TYPE) ++ fields("config_connector");
+fields("put") ->
+ fields("config_connector");
+fields("get") ->
+ emqx_bridge_schema:status_fields() ++ fields("config_connector").
+
+desc("config_connector") ->
+ ?DESC(emqx_bridge_pulsar, "config_connector");
+desc(connector_resource_opts) ->
+ ?DESC(emqx_bridge_pulsar, connector_resource_opts);
+desc(_) ->
+ undefined.
+
+connector_examples(Method) ->
+ [
+ #{
+ <<"pulsar">> =>
+ #{
+ summary => <<"Pulsar Connector">>,
+ value => emqx_connector_schema:connector_values(
+ Method, ?TYPE, connector_example_values()
+ )
+ }
+ }
+ ].
+
+connector_example_values() ->
+ #{
+ name => <<"pulsar_connector">>,
+ type => ?TYPE,
+ enable => true,
+ servers => <<"pulsar://127.0.0.1:6650">>,
+ authentication => none,
+ connect_timeout => <<"5s">>,
+ batch_size => 10,
+ compression => no_compression,
+ send_buffer => <<"1MB">>,
+ retention_period => <<"100s">>,
+ max_batch_bytes => <<"32MB">>,
+ pulsar_topic => <<"test_topic">>,
+ strategy => random,
+ buffer => #{mode => memory},
+ ssl => #{enable => false}
+ }.
diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl
new file mode 100644
index 000000000..a705ed560
--- /dev/null
+++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl
@@ -0,0 +1,123 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_pulsar_pubsub_schema).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+-export([roots/0, fields/1, desc/1, namespace/0]).
+
+-export([bridge_v2_examples/1]).
+
+-define(ACTION_TYPE, pulsar).
+-define(CONNECTOR_SCHEMA, emqx_bridge_rabbitmq_connector_schema).
+
+namespace() -> "pulsar".
+
+roots() -> [].
+
+fields(action) ->
+ {pulsar,
+ ?HOCON(
+ ?MAP(name, ?R_REF(publisher_action)),
+ #{
+ desc => <<"Pulsar Action Config">>,
+ required => false
+ }
+ )};
+fields(publisher_action) ->
+ emqx_bridge_v2_schema:make_producer_action_schema(
+ ?HOCON(
+ ?R_REF(action_parameters),
+ #{
+ required => true,
+ desc => ?DESC(action_parameters)
+ }
+ ),
+ #{resource_opts_ref => ?R_REF(action_resource_opts)}
+ );
+fields(action_parameters) ->
+ [
+ {sync_timeout,
+ ?HOCON(emqx_schema:timeout_duration_ms(), #{
+ default => <<"3s">>, desc => ?DESC("producer_sync_timeout")
+ })},
+ {message,
+ ?HOCON(?R_REF(producer_pulsar_message), #{
+ required => false, desc => ?DESC("producer_message_opts")
+ })}
+ ];
+fields(producer_pulsar_message) ->
+ [
+ {key,
+ ?HOCON(string(), #{
+ default => <<"${.clientid}">>,
+ desc => ?DESC("producer_key_template")
+ })},
+ {value,
+ ?HOCON(string(), #{
+ default => <<"${.}">>,
+ desc => ?DESC("producer_value_template")
+ })}
+ ];
+fields(action_resource_opts) ->
+ UnsupportedOpts = [
+ batch_size,
+ batch_time,
+ worker_pool_size,
+ request_ttl,
+ inflight_window,
+ max_buffer_bytes,
+ query_mode
+ ],
+ lists:filter(
+ fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,
+ emqx_bridge_v2_schema:action_resource_opts_fields()
+ );
+fields(Field) when
+ Field == "get_bridge_v2";
+ Field == "post_bridge_v2";
+ Field == "put_bridge_v2"
+->
+ emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(publisher_action));
+fields(What) ->
+ error({?MODULE, missing_field_handler, What}).
+
+desc("config") ->
+ ?DESC("desc_config");
+desc(action_resource_opts) ->
+ ?DESC(emqx_resource_schema, "creation_opts");
+desc(action_parameters) ->
+ ?DESC(action_parameters);
+desc(producer_pulsar_message) ->
+ ?DESC("producer_message_opts");
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+ ["Configuration for WebHook using `", string:to_upper(Method), "` method."];
+desc(publisher_action) ->
+ ?DESC(publisher_action);
+desc(_) ->
+ undefined.
+
+bridge_v2_examples(Method) ->
+ [
+ #{
+ <<"pulsar">> => #{
+ summary => <<"Pulsar Producer Action">>,
+ value => emqx_bridge_v2_schema:action_values(
+ Method,
+ _ActionType = ?ACTION_TYPE,
+ _ConnectorType = pulsar,
+ #{
+ parameters => #{
+ sync_timeout => <<"5s">>,
+ message => #{
+ key => <<"${.clientid}">>,
+ value => <<"${.}">>
+ }
+ }
+ }
+ )
+ }
+ }
+ ].
diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl
similarity index 97%
rename from apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl
rename to apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl
index dfc5af3a7..c9b25cc71 100644
--- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl
+++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl
@@ -1,7 +1,7 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
--module(emqx_bridge_pulsar_impl_producer_SUITE).
+-module(emqx_bridge_pulsar_connector_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
@@ -550,7 +550,6 @@ kill_resource_managers() ->
t_start_and_produce_ok(Config) ->
MQTTTopic = ?config(mqtt_topic, Config),
- ResourceId = resource_id(Config),
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
QoS = 0,
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
@@ -600,6 +599,13 @@ t_start_and_produce_ok(Config) ->
_Sleep = 100,
_Attempts0 = 20,
begin
+ BridgeId = emqx_bridge_resource:bridge_id(
+ <<"pulsar">>, ?config(pulsar_name, Config)
+ ),
+ ConnectorId = emqx_bridge_resource:resource_id(
+ <<"pulsar">>, ?config(pulsar_name, Config)
+ ),
+ Id = <<"action:", BridgeId/binary, ":", ConnectorId/binary>>,
?assertMatch(
#{
counters := #{
@@ -612,7 +618,7 @@ t_start_and_produce_ok(Config) ->
success := 2
}
},
- emqx_resource_manager:get_metrics(ResourceId)
+ emqx_resource:get_metrics(Id)
),
?assertEqual(
1, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.success')
@@ -631,17 +637,22 @@ t_start_and_produce_ok(Config) ->
%% Under normal operations, the bridge will be called async via
%% `simple_async_query'.
t_sync_query(Config) ->
- ResourceId = resource_id(Config),
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
?check_trace(
begin
?assertMatch({ok, _}, create_bridge_api(Config)),
+ ResourceId = resource_id(Config),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
- Message = {send_message, #{payload => Payload}},
+ BridgeId = emqx_bridge_resource:bridge_id(<<"pulsar">>, ?config(pulsar_name, Config)),
+ ConnectorId = emqx_bridge_resource:resource_id(
+ <<"pulsar">>, ?config(pulsar_name, Config)
+ ),
+ Id = <<"action:", BridgeId/binary, ":", ConnectorId/binary>>,
+ Message = {Id, #{payload => Payload}},
?assertMatch(
{ok, #{sequence_id := _}}, emqx_resource:simple_sync_query(ResourceId, Message)
),
@@ -688,13 +699,13 @@ t_create_via_http(Config) ->
t_start_stop(Config) ->
PulsarName = ?config(pulsar_name, Config),
- ResourceId = resource_id(Config),
?check_trace(
begin
?assertMatch(
{ok, _},
create_bridge(Config)
),
+ ResourceId = resource_id(Config),
%% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness.
?retry(
@@ -745,11 +756,11 @@ t_on_get_status(Config) ->
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
- ResourceId = resource_id(Config),
?assertMatch(
{ok, _},
create_bridge(Config)
),
+ ResourceId = resource_id(Config),
%% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness.
?retry(
@@ -777,7 +788,6 @@ t_start_when_down(Config) ->
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
- ResourceId = resource_id(Config),
?check_trace(
begin
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
@@ -787,6 +797,7 @@ t_start_when_down(Config) ->
),
ok
end),
+ ResourceId = resource_id(Config),
%% Should recover given enough time.
?retry(
_Sleep = 1_000,
@@ -902,7 +913,6 @@ t_failure_to_start_producer(Config) ->
%% die for whatever reason.
t_producer_process_crash(Config) ->
MQTTTopic = ?config(mqtt_topic, Config),
- ResourceId = resource_id(Config),
QoS = 0,
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
@@ -934,6 +944,7 @@ t_producer_process_crash(Config) ->
ok
after 1_000 -> ct:fail("pid didn't die")
end,
+ ResourceId = resource_id(Config),
?retry(
_Sleep0 = 50,
_Attempts0 = 50,
@@ -995,8 +1006,8 @@ t_resource_manager_crash_after_producers_started(Config) ->
Producers =/= undefined,
10_000
),
- ?assertMatch(ok, delete_bridge(Config)),
?assertEqual([], get_pulsar_producers()),
+ ?assertMatch({error, bridge_not_found}, delete_bridge(Config)),
ok
end,
[]
@@ -1028,8 +1039,8 @@ t_resource_manager_crash_before_producers_started(Config) ->
#{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined},
10_000
),
- ?assertMatch(ok, delete_bridge(Config)),
?assertEqual([], get_pulsar_producers()),
+ ?assertMatch({error, bridge_not_found}, delete_bridge(Config)),
ok
end,
[]
@@ -1046,7 +1057,7 @@ t_strategy_key_validation(Config) ->
<<"reason">> := <<"Message key cannot be empty", _/binary>>
}
}}},
- probe_bridge_api(
+ create_bridge_api(
Config,
#{<<"strategy">> => <<"key_dispatch">>, <<"message">> => #{<<"key">> => <<>>}}
)
@@ -1060,7 +1071,7 @@ t_strategy_key_validation(Config) ->
<<"reason">> := <<"Message key cannot be empty", _/binary>>
}
}}},
- create_bridge_api(
+ probe_bridge_api(
Config,
#{<<"strategy">> => <<"key_dispatch">>, <<"message">> => #{<<"key">> => <<>>}}
)
@@ -1075,7 +1086,6 @@ do_t_cluster(Config) ->
?check_trace(
begin
MQTTTopic = ?config(mqtt_topic, Config),
- ResourceId = resource_id(Config),
Nodes = [N1, N2 | _] = cluster(Config),
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
QoS = 0,
@@ -1095,6 +1105,7 @@ do_t_cluster(Config) ->
),
25_000
),
+ ResourceId = erpc:call(N1, ?MODULE, resource_id, [Config]),
lists:foreach(
fun(N) ->
?retry(
@@ -1147,12 +1158,12 @@ t_resilience(Config) ->
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
- ResourceId = resource_id(Config),
?check_trace(
begin
{ok, _} = create_bridge(Config),
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+ ResourceId = resource_id(Config),
?retry(
_Sleep0 = 1_000,
_Attempts0 = 20,
diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl
index 3fb00632c..9a9741226 100644
--- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl
+++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl
@@ -170,7 +170,7 @@ fields(Field) when
->
emqx_bridge_v2_schema:api_fields(Field, ?SOURCE_TYPE, fields(subscriber_source));
fields(What) ->
- error({emqx_bridge_mqtt_pubsub_schema, missing_field_handler, What}).
+ error({?MODULE, missing_field_handler, What}).
%% v2: api schema
%% The parameter equals to
%% `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1
diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl
index 23bc5a8b4..15b196af4 100644
--- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl
+++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl
@@ -74,6 +74,8 @@ resource_type(greptimedb) ->
emqx_bridge_greptimedb_connector;
resource_type(tdengine) ->
emqx_bridge_tdengine_connector;
+resource_type(pulsar) ->
+ emqx_bridge_pulsar_connector;
resource_type(rabbitmq) ->
emqx_bridge_rabbitmq_connector;
resource_type(s3) ->
@@ -94,6 +96,8 @@ connector_impl_module(elasticsearch) ->
emqx_bridge_es_connector;
connector_impl_module(opents) ->
emqx_bridge_opents_connector;
+connector_impl_module(pulsar) ->
+ emqx_bridge_pulsar_connector;
connector_impl_module(tdengine) ->
emqx_bridge_tdengine_connector;
connector_impl_module(rabbitmq) ->
@@ -317,6 +321,14 @@ connector_structs() ->
required => false
}
)},
+ {pulsar,
+ mk(
+ hoconsc:map(name, ref(emqx_bridge_pulsar_connector_schema, "config_connector")),
+ #{
+ desc => <<"Pulsar Connector Config">>,
+ required => false
+ }
+ )},
{rabbitmq,
mk(
hoconsc:map(name, ref(emqx_bridge_rabbitmq_connector_schema, "config_connector")),
@@ -361,6 +373,7 @@ schema_modules() ->
emqx_bridge_iotdb_connector,
emqx_bridge_es_connector,
emqx_bridge_rabbitmq_connector_schema,
+ emqx_bridge_pulsar_connector_schema,
emqx_bridge_opents_connector,
emqx_bridge_greptimedb,
emqx_bridge_tdengine_connector,
@@ -410,6 +423,7 @@ api_schemas(Method) ->
api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method),
api_ref(emqx_bridge_opents_connector, <<"opents">>, Method),
api_ref(emqx_bridge_rabbitmq_connector_schema, <<"rabbitmq">>, Method),
+ api_ref(emqx_bridge_pulsar_connector_schema, <<"pulsar">>, Method),
api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_connector"),
api_ref(emqx_bridge_tdengine_connector, <<"tdengine">>, Method),
api_ref(emqx_bridge_s3, <<"s3">>, Method ++ "_connector")
diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl
index fc68bbd9d..430a74bdb 100644
--- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl
+++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl
@@ -174,6 +174,8 @@ connector_type_to_bridge_types(opents) ->
[opents];
connector_type_to_bridge_types(greptimedb) ->
[greptimedb];
+connector_type_to_bridge_types(pulsar) ->
+ [pulsar_producer, pulsar];
connector_type_to_bridge_types(tdengine) ->
[tdengine];
connector_type_to_bridge_types(rabbitmq) ->
@@ -269,6 +271,7 @@ split_bridge_to_connector_and_action(
#{<<"connector">> := ConnectorName0} -> ConnectorName0;
_ -> generate_connector_name(ConnectorsMap, BridgeName, 0)
end,
+
OrgActionType = emqx_action_info:bridge_v1_type_to_action_type(BridgeType),
{ActionMap, ActionType, ActionOrSource} =
case emqx_action_info:has_custom_bridge_v1_config_to_action_config(BridgeType) of
diff --git a/rel/i18n/emqx_bridge_pulsar.hocon b/rel/i18n/emqx_bridge_pulsar.hocon
index e1b6153d3..913ab8d2a 100644
--- a/rel/i18n/emqx_bridge_pulsar.hocon
+++ b/rel/i18n/emqx_bridge_pulsar.hocon
@@ -1,180 +1,173 @@
emqx_bridge_pulsar {
- auth_basic {
- desc = "Parameters for basic authentication."
- label = "Basic auth params"
- }
- auth_basic_password {
- desc = "Basic authentication password."
- label = "Password"
- }
+config_connector.desc:
+"""Pulsar connector config"""
+config_connector.label:
+"""Pulsar Connector"""
- auth_basic_username {
- desc = "Basic authentication username."
- label = "Username"
- }
+connector_resource_opts.desc:
+"""Pulsar connector resource options"""
+connector_resource_opts.label:
+"""Resource Options"""
- auth_token {
- desc = "Parameters for token authentication."
- label = "Token auth params"
- }
+auth_basic.desc:
+ """Parameters for basic authentication."""
+auth_basic.label:
+"""Basic auth params"""
- auth_token_jwt {
- desc = "JWT authentication token."
- label = "JWT"
- }
+auth_basic_password.desc:
+"""Basic authentication password."""
+auth_basic_password.label:
+"""Password"""
- authentication {
- desc = "Authentication configs."
- label = "Authentication"
- }
+auth_basic_username.desc:
+"""Basic authentication username."""
+auth_basic_username.label:
+"""Username"""
- buffer_memory_overload_protection {
- desc = "Applicable when buffer mode is set to memory\n"
- "EMQX will drop old buffered messages under high memory pressure."
- " The high memory threshold is defined in config sysmon.os.sysmem_high_watermark."
- " NOTE: This config only works on Linux."
- label = "Memory Overload Protection"
- }
+auth_token.desc:
+"""Parameters for token authentication."""
+auth_token.label:
+"""Token auth params"""
- buffer_mode {
- desc = "Message buffer mode.\n"
- "memory: Buffer all messages in memory. The messages will be lost"
- " in case of EMQX node restart\ndisk: Buffer all messages on disk."
- " The messages on disk are able to survive EMQX node restart.\n"
- "hybrid: Buffer message in memory first, when up to certain limit"
- " (see segment_bytes config for more information), then start offloading"
- " messages to disk, Like memory mode, the messages will be lost in"
- " case of EMQX node restart."
- label = "Buffer Mode"
- }
+auth_token_jwt.desc:
+"""JWT authentication token."""
+auth_token_jwt.label:
+"""JWT"""
- buffer_per_partition_limit {
- desc = "Number of bytes allowed to buffer for each Pulsar partition."
- " When this limit is exceeded, old messages will be dropped in a trade for credits"
- " for new messages to be buffered."
- label = "Per-partition Buffer Limit"
- }
+authentication.desc:
+"""Authentication configs."""
+authentication.label:
+"""Authentication"""
- buffer_segment_bytes {
- desc = "Applicable when buffer mode is set to disk or hybrid.\n"
- "This value is to specify the size of each on-disk buffer file."
- label = "Segment File Bytes"
- }
+buffer_memory_overload_protection.desc:
+"""Applicable when buffer mode is set to memory
+EMQX will drop old buffered messages under high memory pressure.
+The high memory threshold is defined in config sysmon.os.sysmem_high_watermark.
+ NOTE: This config only works on Linux."""
+buffer_memory_overload_protection.label:
+"""Memory Overload Protection"""
- config_enable {
- desc = "Enable (true) or disable (false) this Pulsar bridge."
- label = "Enable or Disable"
- }
+buffer_mode.desc:
+"""Message buffer mode.
+memory: Buffer all messages in memory. The messages will be lost
+ in case of EMQX node restart\ndisk: Buffer all messages on disk.
+ The messages on disk are able to survive EMQX node restart.
+hybrid: Buffer message in memory first, when up to certain limit
+ (see segment_bytes config for more information), then start offloading
+ messages to disk, Like memory mode, the messages will be lost in
+ case of EMQX node restart."""
+buffer_mode.label:
+"""Buffer Mode"""
- connect_timeout {
- desc = "Maximum wait time for TCP connection establishment (including authentication time if enabled)."
- label = "Connect Timeout"
- }
+buffer_per_partition_limit.desc:
+"""Number of bytes allowed to buffer for each Pulsar partition.
+ When this limit is exceeded, old messages will be dropped in a trade for credits
+ for new messages to be buffered."""
+ buffer_per_partition_limit.label:
+"""Per-partition Buffer Limit"""
- desc_name {
- desc = "Action name, a human-readable identifier."
- label = "Action Name"
- }
+desc_name.desc:
+"""Action name, a human-readable identifier."""
+desc_name.label:
+"""Action Name"""
- desc_type {
- desc = "The Bridge Type"
- label = "Bridge Type"
- }
+buffer_segment_bytes.desc:
+"""Applicable when buffer mode is set to disk or hybrid.
+This value is to specify the size of each on-disk buffer file."""
+buffer_segment_bytes.label:
+"""Segment File Bytes"""
- producer_batch_size {
- desc = "Maximum number of individual requests to batch in a Pulsar message."
- label = "Batch size"
- }
+config_enable.desc:
+"""Enable (true) or disable (false) this Pulsar bridge."""
+config_enable.label:
+"""Enable or Disable"""
- producer_buffer {
- desc = "Configure producer message buffer.\n\n"
- "Tell Pulsar producer how to buffer messages when EMQX has more messages to"
- " send than Pulsar can keep up, or when Pulsar is down."
- label = "Message Buffer"
- }
+connect_timeout.desc:
+"""Maximum wait time for TCP connection establishment (including authentication time if enabled)."""
+connect_timeout.label:
+"""Connect Timeout"""
- producer_compression {
- desc = "Compression method."
- label = "Compression"
- }
+desc_name.desc:
+"""Bridge name, used as a human-readable description of the bridge."""
+desc_name.label:
+"""Bridge Name"""
- producer_key_template {
- desc = "Template to render Pulsar message key."
- label = "Message Key"
- }
+desc_type.desc:
+"""The Bridge Type"""
+desc_type.label:
+"""Bridge Type"""
- producer_local_topic {
- desc = "MQTT topic or topic filter as data source (bridge input)."
- " If rule action is used as data source, this config should be left empty,"
- " otherwise messages will be duplicated in Pulsar."
- label = "Source MQTT Topic"
- }
+producer_batch_size.desc:
+"""Maximum number of individual requests to batch in a Pulsar message."""
+producer_batch_size.label:
+"""Batch size"""
- producer_max_batch_bytes {
- desc = "Maximum bytes to collect in a Pulsar message batch. Most of the Pulsar brokers"
- " default to a limit of 5 MB batch size. EMQX's default value is less than 5 MB in"
- " order to compensate Pulsar message encoding overheads (especially when each individual"
- " message is very small). When a single message is over the limit, it is still"
- " sent (as a single element batch)."
- label = "Max Batch Bytes"
- }
+producer_buffer.desc:
+"""Configure producer message buffer."
+Tell Pulsar producer how to buffer messages when EMQX has more messages to"
+ send than Pulsar can keep up, or when Pulsar is down."""
+producer_buffer.label:
+"""Message Buffer"""
- producer_message_opts {
- desc = "Template to render a Pulsar message."
- label = "Pulsar Message Template"
- }
+producer_compression.desc:
+"""Compression method."""
+producer_compression.label:
+"""Compression"""
- producer_pulsar_message {
- desc = "Template to render a Pulsar message."
- label = "Pulsar Message Template"
- }
+producer_local_topic.desc:
+"""MQTT topic or topic filter as data source (bridge input)
+ If rule action is used as data source, this config should be left empty,
+ otherwise messages will be duplicated in Pulsar."""
+producer_local_topic.label:
+"""Source MQTT Topic"""
- producer_pulsar_topic {
- desc = "Pulsar topic name"
- label = "Pulsar topic name"
- }
+producer_max_batch_bytes.desc:
+"""Maximum bytes to collect in a Pulsar message batch. Most of the Pulsar brokers
+ default to a limit of 5 MB batch size. EMQX's default value is less than 5 MB in
+ order to compensate Pulsar message encoding overheads (especially when each individual
+ message is very small). When a single message is over the limit, it is still
+ sent (as a single element batch)."""
+producer_max_batch_bytes.label:
+"""Max Batch Bytes"""
- producer_retention_period {
- desc = "The amount of time messages will be buffered while there is no connection to"
- " the Pulsar broker. Longer times mean that more memory/disk will be used"
- label = "Retention Period"
- }
- producer_send_buffer {
- desc = "Fine tune the socket send buffer. The default value is tuned for high throughput."
- label = "Socket Send Buffer Size"
- }
+producer_pulsar_topic.desc:
+"""Pulsar topic name"""
+producer_pulsar_topic.label:
+"""Pulsar topic name"""
- producer_strategy {
- desc = "Partition strategy is to tell the producer how to dispatch messages to Pulsar partitions.\n"
- "\n"
- "random: Randomly pick a partition for each message.\n"
- "roundrobin: Pick each available producer in turn for each message.\n"
- "key_dispatch: Hash Pulsar message key of the first message in a batch"
- " to a partition number."
- label = "Partition Strategy"
- }
+producer_retention_period.desc:
+"""The amount of time messages will be buffered while there is no connection to
+ the Pulsar broker. Longer times mean that more memory/disk will be used"""
+producer_retention_period.label:
+"""Retention Period"""
- producer_sync_timeout {
- desc = "Maximum wait time for receiving a receipt from Pulsar when publishing synchronously."
- label = "Sync publish timeout"
- }
+producer_send_buffer.desc:
+"""Fine tune the socket send buffer. The default value is tuned for high throughput."""
+producer_send_buffer.label:
+"""Socket Send Buffer Size"""
- producer_value_template {
- desc = "Template to render Pulsar message value."
- label = "Message Value"
- }
+producer_strategy.desc:
+"""Partition strategy is to tell the producer how to dispatch messages to Pulsar partitions.
- pulsar_producer_struct {
- desc = "Configuration for a Pulsar bridge."
- label = "Pulsar Bridge Configuration"
- }
+random: Randomly pick a partition for each message.
+roundrobin: Pick each available producer in turn for each message.
+key_dispatch: Hash Pulsar message key of the first message in a batch
+ to a partition number."""
+producer_strategy.label:
+"""Partition Strategy"""
+
+pulsar_producer_struct.desc:
+"""Configuration for a Pulsar bridge."""
+pulsar_producer_struct.label:
+"""Pulsar Bridge Configuration"""
+
+servers.desc:
+"""A comma separated list of Pulsar URLs in the form scheme://host[:port]
+ for the client to connect to. The supported schemes are pulsar:// (default)
+ and pulsar+ssl://. The default port is 6650."""
+servers.label:
+"""Servers"""
- servers {
- desc = "A comma separated list of Pulsar URLs in the form scheme://host[:port]"
- " for the client to connect to. The supported schemes are pulsar:// (default)"
- " and pulsar+ssl://. The default port is 6650."
- label = "Servers"
- }
}
diff --git a/rel/i18n/emqx_bridge_pulsar_pubsub_schema.hocon b/rel/i18n/emqx_bridge_pulsar_pubsub_schema.hocon
new file mode 100644
index 000000000..a359bc755
--- /dev/null
+++ b/rel/i18n/emqx_bridge_pulsar_pubsub_schema.hocon
@@ -0,0 +1,38 @@
+emqx_bridge_pulsar_pubsub_schema {
+
+action_parameters.desc:
+"""Action specific configs."""
+action_parameters.label:
+"""Action"""
+
+publisher_action.desc:
+"""Publish message to pulsar topic"""
+publisher_action.label:
+"""Publish Action """
+
+producer_sync_timeout.desc:
+"""Maximum wait time for receiving a receipt from Pulsar when publishing synchronously."""
+producer_sync_timeout.label:
+"""Sync publish timeout"""
+
+producer_key_template.desc:
+"""Template to render Pulsar message key."""
+producer_key_template.label:
+"""Message Key"""
+
+producer_value_template.desc:
+"""Template to render Pulsar message value."""
+producer_value_template.label:
+"""Message Value"""
+
+producer_message_opts.desc:
+"""Template to render a Pulsar message."""
+producer_message_opts.label:
+"""Pulsar Message Template"""
+
+producer_pulsar_message.desc:
+"""Template to render a Pulsar message."""
+producer_pulsar_message.label:
+"""Pulsar Message Template"""
+
+}