From d360e7ead1a45bc4e0a1eb2294a7173b781a04f6 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 11 Jan 2021 15:34:56 +0800 Subject: [PATCH] feat(exproto): use client streaming APIs for handler - Use the gRPC client streaming APIs to improve the ConnectionHandler server performance. - Change the 'conn' field type to term binary --- apps/emqx_exhook/rebar.config | 4 +- apps/emqx_exproto/priv/protos/exproto.proto | 10 +- apps/emqx_exproto/rebar.config | 4 +- .../emqx_exproto/src/emqx_exproto_channel.erl | 9 +- apps/emqx_exproto/src/emqx_exproto_gcli.erl | 62 +++++--- apps/emqx_exproto/src/emqx_exproto_gsvr.erl | 4 +- apps/emqx_exproto/test/emqx_exproto_SUITE.erl | 1 - .../test/emqx_exproto_echo_svr.erl | 133 +++++++++++------- 8 files changed, 137 insertions(+), 90 deletions(-) diff --git a/apps/emqx_exhook/rebar.config b/apps/emqx_exhook/rebar.config index ebeaddeab..d2e437b8b 100644 --- a/apps/emqx_exhook/rebar.config +++ b/apps/emqx_exhook/rebar.config @@ -1,11 +1,11 @@ %%-*- mode: erlang -*- {plugins, [rebar3_proper, - {grpc_plugin, {git, "https://github.com/HJianBo/grpcbox_plugin", {tag, "v0.9.1"}}} + {grpc_plugin, {git, "https://github.com/HJianBo/grpcbox_plugin", {tag, "v0.10.0"}}} ]}. {deps, - [{grpc, {git, "https://github.com/emqx/grpc", {tag, "0.5.0"}}} + [{grpc, {git, "https://github.com/emqx/grpc", {tag, "0.6.0"}}} ]}. {grpc, diff --git a/apps/emqx_exproto/priv/protos/exproto.proto b/apps/emqx_exproto/priv/protos/exproto.proto index 633cc1758..4b567693c 100644 --- a/apps/emqx_exproto/priv/protos/exproto.proto +++ b/apps/emqx_exproto/priv/protos/exproto.proto @@ -47,17 +47,17 @@ service ConnectionHandler { // -- socket layer - rpc OnSocketCreated(SocketCreatedRequest) returns (EmptySuccess) {}; + rpc OnSocketCreated(stream SocketCreatedRequest) returns (EmptySuccess) {}; - rpc OnSocketClosed(SocketClosedRequest) returns (EmptySuccess) {}; + rpc OnSocketClosed(stream SocketClosedRequest) returns (EmptySuccess) {}; - rpc OnReceivedBytes(ReceivedBytesRequest) returns (EmptySuccess) {}; + rpc OnReceivedBytes(stream ReceivedBytesRequest) returns (EmptySuccess) {}; // -- pub/sub layer - rpc OnTimerTimeout(TimerTimeoutRequest) returns (EmptySuccess) {}; + rpc OnTimerTimeout(stream TimerTimeoutRequest) returns (EmptySuccess) {}; - rpc OnReceivedMessages(ReceivedMessagesRequest) returns (EmptySuccess) {}; + rpc OnReceivedMessages(stream ReceivedMessagesRequest) returns (EmptySuccess) {}; } message EmptySuccess { } diff --git a/apps/emqx_exproto/rebar.config b/apps/emqx_exproto/rebar.config index 9dd5a2090..88831ce15 100644 --- a/apps/emqx_exproto/rebar.config +++ b/apps/emqx_exproto/rebar.config @@ -9,11 +9,11 @@ {parse_transform}]}. {plugins, [rebar3_proper, - {grpc_plugin, {git, "https://github.com/HJianBo/grpcbox_plugin", {tag, "v0.9.1"}}} + {grpc_plugin, {git, "https://github.com/HJianBo/grpcbox_plugin", {tag, "v0.10.0"}}} ]}. {deps, - [{grpc, {git, "https://github.com/emqx/grpc", {tag, "0.5.0"}}} + [{grpc, {git, "https://github.com/emqx/grpc", {tag, "0.6.0"}}} ]}. {grpc, diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index 4865cca58..49ded91ff 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -364,7 +364,8 @@ handle_info({sock_closed, Reason}, case queue:len(Queue) =:= 0 andalso Inflight =:= undefined of true -> - {shutdown, {sock_closed, Reason}, Channel}; + Channel1 = ensure_disconnected({sock_closed, Reason}, Channel), + {shutdown, {sock_closed, Reason}, Channel1}; _ -> %% delayed close process for flushing all callback funcs to gRPC server Channel1 = Channel#channel{closed_reason = {sock_closed, Reason}}, @@ -372,9 +373,9 @@ handle_info({sock_closed, Reason}, {ok, ensure_disconnected({sock_closed, Reason}, Channel2)} end; -handle_info({hreply, on_socket_created, {ok, _}}, Channel) -> +handle_info({hreply, on_socket_created, ok}, Channel) -> dispatch_or_close_process(Channel#channel{inflight = undefined}); -handle_info({hreply, FunName, {ok, _}}, Channel) +handle_info({hreply, FunName, ok}, Channel) when FunName == on_socket_closed; FunName == on_received_bytes; FunName == on_received_messages; @@ -525,7 +526,7 @@ interval(alive_timer, #channel{keepalive = Keepalive}) -> %%-------------------------------------------------------------------- wrap(Req) -> - Req#{conn => pid_to_list(self())}. + Req#{conn => base64:encode(term_to_binary(self()))}. dispatch_or_close_process(Channel = #channel{ rqueue = Queue, diff --git a/apps/emqx_exproto/src/emqx_exproto_gcli.erl b/apps/emqx_exproto/src/emqx_exproto_gcli.erl index 69784b70c..90e5d75a3 100644 --- a/apps/emqx_exproto/src/emqx_exproto_gcli.erl +++ b/apps/emqx_exproto/src/emqx_exproto_gcli.erl @@ -37,6 +37,12 @@ , code_change/3 ]). +-record(state, { + pool, + id, + streams + }). + -define(CONN_ADAPTER_MOD, emqx_exproto_v_1_connection_handler_client). %%-------------------------------------------------------------------- @@ -68,32 +74,34 @@ pick(Conn) -> init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), - {ok, #{pool => Pool, id => Id}}. + {ok, #state{pool = Pool, id = Id, streams = #{}}}. handle_call(_Request, _From, State) -> {reply, ok, State}. -handle_cast({rpc, Fun, Req, Options, From}, State) -> - try - case apply(?CONN_ADAPTER_MOD, Fun, [Req, Options]) of - {ok, Resp, _Metadata} -> - ?LOG(debug, "~p got {ok, ~0p, ~0p}", [Fun, Resp, _Metadata]), - reply(From, Fun, {ok, Resp}); - {error, {Code, Msg}, _Metadata} -> - ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p", - [?CONN_ADAPTER_MOD, Fun, Req, Options, Code, Msg]), - reply(From, Fun, {error, {Code, Msg}}); - {error, Reason} -> - ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p", - [?CONN_ADAPTER_MOD, Fun, Req, Options, Reason]), - reply(From, Fun, {error, Reason}) - end - catch _ : Rsn : Stk -> - ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~0p", - [?CONN_ADAPTER_MOD, Fun, Req, Options, Rsn, Stk]), - reply(From, Fun, {error, Rsn}) - end, - {noreply, State}. +handle_cast({rpc, Fun, Req, Options, From}, State = #state{streams = Streams}) -> + case ensure_stream_opened(Fun, Options, Streams) of + {error, Reason} -> + ?LOG(error, "CALL ~0p:~0p(~0p) failed, reason: ~0p", + [?CONN_ADAPTER_MOD, Fun, Options, Reason]), + reply(From, Fun, {error, Reason}), + {noreply, State#state{streams = Streams#{Fun => undefined}}}; + {ok, Stream} -> + case catch grpc_client:send(Stream, Req) of + ok -> + ?LOG(debug, "Send to ~p method successfully, request: ~0p", [Fun, Req]), + reply(From, Fun, ok), + {noreply, State#state{streams = Streams#{Fun => Stream}}}; + {'EXIT', {timeout, _Stk}} -> + ?LOG(error, "Send to ~p method timeout, request: ~0p", [Fun, Req]), + reply(From, Fun, {error, timeout}), + {noreply, State#state{streams = Streams#{Fun => Stream}}}; + {'EXIT', {Reason1, _Stk}} -> + ?LOG(error, "Send to ~p method failure, request: ~0p, stacktrace: ~0p", [Fun, Req, _Stk]), + reply(From, Fun, {error, Reason1}), + {noreply, State#state{streams = Streams#{Fun => undefined}}} + end + end. handle_info(_Info, State) -> {noreply, State}. @@ -111,3 +119,13 @@ code_change(_OldVsn, State, _Extra) -> reply(Pid, Fun, Result) -> Pid ! {hreply, Fun, Result}, ok. + +ensure_stream_opened(Fun, Options, Streams) -> + case maps:get(Fun, Streams, undefined) of + undefined -> + case apply(?CONN_ADAPTER_MOD, Fun, [Options]) of + {ok, Stream} -> {ok, Stream}; + {error, Reason} -> {error, Reason} + end; + Stream -> {ok, Stream} + end. diff --git a/apps/emqx_exproto/src/emqx_exproto_gsvr.erl b/apps/emqx_exproto/src/emqx_exproto_gsvr.erl index 286784009..c1007ee1d 100644 --- a/apps/emqx_exproto/src/emqx_exproto_gsvr.erl +++ b/apps/emqx_exproto/src/emqx_exproto_gsvr.erl @@ -115,10 +115,10 @@ unsubscribe(Req = #{conn := Conn, topic := Topic}, Md) -> %%-------------------------------------------------------------------- to_pid(ConnStr) -> - list_to_pid(binary_to_list(ConnStr)). + binary_to_term(base64:decode(ConnStr)). call(ConnStr, Req) -> - case catch to_pid(ConnStr) of + case catch to_pid(ConnStr) of {'EXIT', {badarg, _}} -> {error, ?RESP_PARAMS_TYPE_ERROR, <<"The conn type error">>}; diff --git a/apps/emqx_exproto/test/emqx_exproto_SUITE.erl b/apps/emqx_exproto/test/emqx_exproto_SUITE.erl index dc6a25c06..e3cab1792 100644 --- a/apps/emqx_exproto/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_exproto/test/emqx_exproto_SUITE.erl @@ -239,7 +239,6 @@ t_hook_connected_disconnected(Cfg) -> emqx:hook('client.connected', HookFun1), emqx:hook('client.disconnected', HookFun2), - send(Sock, ConnBin), {ok, ConnAckBin} = recv(Sock, 5000), diff --git a/apps/emqx_exproto/test/emqx_exproto_echo_svr.erl b/apps/emqx_exproto/test/emqx_exproto_echo_svr.erl index 3742a29a8..031093b50 100644 --- a/apps/emqx_exproto/test/emqx_exproto_echo_svr.erl +++ b/apps/emqx_exproto/test/emqx_exproto_echo_svr.erl @@ -40,6 +40,8 @@ , on_received_messages/2 ]). +-define(LOG(Fmt, Args), io:format(standard_error, Fmt, Args)). + -define(HTTP, #{grpc_opts => #{service_protos => [emqx_exproto_pb], services => #{'emqx.exproto.v1.ConnectionHandler' => ?MODULE}}, listen_opts => #{port => 9001, @@ -48,23 +50,44 @@ transport_opts => #{ssl => false}}). -define(CLIENT, emqx_exproto_v_1_connection_adapter_client). --define(send(Req), ?CLIENT:send(Req, #{channel => ct_test_channel})). --define(close(Req), ?CLIENT:close(Req, #{channel => ct_test_channel})). --define(authenticate(Req), ?CLIENT:authenticate(Req, #{channel => ct_test_channel})). --define(start_timer(Req), ?CLIENT:start_timer(Req, #{channel => ct_test_channel})). --define(publish(Req), ?CLIENT:publish(Req, #{channel => ct_test_channel})). --define(subscribe(Req), ?CLIENT:subscribe(Req, #{channel => ct_test_channel})). --define(unsubscribe(Req), ?CLIENT:unsubscribe(Req, #{channel => ct_test_channel})). --define(TYPE_CONNECT, 1). --define(TYPE_CONNACK, 2). --define(TYPE_PUBLISH, 3). --define(TYPE_PUBACK, 4). --define(TYPE_SUBSCRIBE, 5). --define(TYPE_SUBACK, 6). +-define(send(Req), ?CLIENT:send(Req, #{channel => ct_test_channel})). +-define(close(Req), ?CLIENT:close(Req, #{channel => ct_test_channel})). +-define(authenticate(Req), ?CLIENT:authenticate(Req, #{channel => ct_test_channel})). +-define(start_timer(Req), ?CLIENT:start_timer(Req, #{channel => ct_test_channel})). +-define(publish(Req), ?CLIENT:publish(Req, #{channel => ct_test_channel})). +-define(subscribe(Req), ?CLIENT:subscribe(Req, #{channel => ct_test_channel})). +-define(unsubscribe(Req), ?CLIENT:unsubscribe(Req, #{channel => ct_test_channel})). + +-define(TYPE_CONNECT, 1). +-define(TYPE_CONNACK, 2). +-define(TYPE_PUBLISH, 3). +-define(TYPE_PUBACK, 4). +-define(TYPE_SUBSCRIBE, 5). +-define(TYPE_SUBACK, 6). -define(TYPE_UNSUBSCRIBE, 7). --define(TYPE_UNSUBACK, 8). --define(TYPE_DISCONNECT, 9). +-define(TYPE_UNSUBACK, 8). +-define(TYPE_DISCONNECT, 9). + +-define(loop_recv_and_reply_empty_success(Stream), + ?loop_recv_and_reply_empty_success(Stream, fun(_) -> ok end)). + +-define(loop_recv_and_reply_empty_success(Stream, Fun), + begin + LoopRecv = fun _Lp(_St) -> + case grpc_stream:recv(_St) of + {more, _Reqs, _NSt} -> + ?LOG("~p: ~p~n", [?FUNCTION_NAME, _Reqs]), + Fun(_Reqs), _Lp(_NSt); + {eos, _Reqs, _NSt} -> + ?LOG("~p: ~p~n", [?FUNCTION_NAME, _Reqs]), + Fun(_Reqs), _NSt + end + end, + NStream = LoopRecv(Stream), + grpc_stream:reply(NStream, #{}), + {ok, NStream} + end). %%-------------------------------------------------------------------- %% APIs @@ -92,47 +115,53 @@ stop([_ChannPid, _SvrPid]) -> %% Protocol Adapter callbacks %%-------------------------------------------------------------------- --spec on_socket_created(emqx_exproto_pb:socket_created_request(), grpc:metadata()) - -> {ok, emqx_exproto_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_socket_created(Req, Md) -> - io:format("~p: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{}, Md}. +-spec on_socket_created(grpc_stream:stream(), grpc:metadata()) + -> {ok, grpc_stream:stream()}. +on_socket_created(Stream, _Md) -> + ?loop_recv_and_reply_empty_success(Stream). --spec on_socket_closed(emqx_exproto_pb:socket_closed_request(), grpc:metadata()) - -> {ok, emqx_exproto_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_socket_closed(Req, Md) -> - io:format("~p: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{}, Md}. +-spec on_socket_closed(grpc_stream:stream(), grpc:metadata()) + -> {ok, grpc_stream:stream()}. +on_socket_closed(Stream, _Md) -> + ?loop_recv_and_reply_empty_success(Stream). --spec on_received_bytes(emqx_exproto_pb:received_bytes_request(), grpc:metadata()) - -> {ok, emqx_exproto_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_received_bytes(Req = #{conn := Conn, bytes := Bytes}, Md) -> - io:format("~p: ~0p~n", [?FUNCTION_NAME, Req]), - #{<<"type">> := Type} = Params = emqx_json:decode(Bytes, [return_maps]), - _ = handle_in(Conn, Type, Params), - {ok, #{}, Md}. +-spec on_received_bytes(grpc_stream:stream(), grpc:metadata()) + -> {ok, grpc_stream:stream()}. +on_received_bytes(Stream, _Md) -> + ?loop_recv_and_reply_empty_success(Stream, + fun(Reqs) -> + lists:foreach( + fun(#{conn := Conn, bytes := Bytes}) -> + #{<<"type">> := Type} = Params = emqx_json:decode(Bytes, [return_maps]), + _ = handle_in(Conn, Type, Params) + end, Reqs) + end). --spec on_timer_timeout(emqx_exproto_pb:timer_timeout_request(), grpc:metadata()) - -> {ok, emqx_exproto_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_timer_timeout(Req = #{conn := Conn, type := 'KEEPALIVE'}, Md) -> - io:format("~p: ~0p~n", [?FUNCTION_NAME, Req]), - handle_out(Conn, ?TYPE_DISCONNECT), - ?close(#{conn => Conn}), - {ok, #{}, Md}. +-spec on_timer_timeout(grpc_stream:stream(), grpc:metadata()) + -> {ok, grpc_stream:stream()}. +on_timer_timeout(Stream, _Md) -> + ?loop_recv_and_reply_empty_success(Stream, + fun(Reqs) -> + lists:foreach( + fun(#{conn := Conn, type := 'KEEPALIVE'}) -> + ?LOG("Close this connection ~p due to keepalive timeout", [Conn]), + handle_out(Conn, ?TYPE_DISCONNECT), + ?close(#{conn => Conn}) + end, Reqs) + end). --spec on_received_messages(emqx_exproto_pb:received_messages_request(), grpc:metadata()) - -> {ok, emqx_exproto_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_received_messages(Req = #{conn := Conn, messages := Messages}, Md) -> - io:format("~p: ~0p~n", [?FUNCTION_NAME, Req]), - lists:foreach(fun(Message) -> - handle_out(Conn, ?TYPE_PUBLISH, Message) - end, Messages), - {ok, #{}, Md}. +-spec on_received_messages(grpc_stream:stream(), grpc:metadata()) + -> {ok, grpc_stream:stream()}. +on_received_messages(Stream, _Md) -> + ?loop_recv_and_reply_empty_success(Stream, + fun(Reqs) -> + lists:foreach( + fun(#{conn := Conn, messages := Messages}) -> + lists:foreach(fun(Message) -> + handle_out(Conn, ?TYPE_PUBLISH, Message) + end, Messages) + end, Reqs) + end). %%-------------------------------------------------------------------- %% The Protocol Example: