From c64751a53ceba544bf00c7291b43dfc14073e7fd Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Sat, 15 Jun 2019 12:55:13 +0800 Subject: [PATCH 01/13] Support module level logger header --- include/logger.hrl | 4 +++- src/emqx_logger.erl | 28 ++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/include/logger.hrl b/include/logger.hrl index 503b6cf21..1cf5facc6 100644 --- a/include/logger.hrl +++ b/include/logger.hrl @@ -14,6 +14,8 @@ %% debug | info | notice | warning | error | critical | alert | emergency +-compile({parse_transform, emqx_logger}). + -define(DEBUG(Format), ?LOG(debug, Format, [])). -define(DEBUG(Format, Args), ?LOG(debug, Format, Args)). @@ -39,5 +41,5 @@ -define(LOG(Level, Format, Args), begin - (logger:log(Level,#{},#{report_cb => fun(_) -> {(Format), (Args)} end})) + (logger:log(Level,#{},#{report_cb => fun(_) -> {'$logger_header'()++(Format), (Args)} end})) end). diff --git a/src/emqx_logger.erl b/src/emqx_logger.erl index b77fa2d70..0a663de20 100644 --- a/src/emqx_logger.erl +++ b/src/emqx_logger.erl @@ -48,6 +48,8 @@ , get_log_handler/1 ]). +-export([parse_transform/2]). + %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ @@ -120,6 +122,9 @@ set_log_level(Level) -> {error, Error} -> {error, {primary_logger_level, Error}} end. +parse_transform(AST, _Opts) -> + trans(AST, "", []). + %%------------------------------------------------------------------------------ %% Internal Functions %%------------------------------------------------------------------------------ @@ -160,3 +165,26 @@ rollback([{ID, Level} | List]) -> rollback(List); rollback([]) -> ok. +%% Generate a function '$logger_header'/0 into the source code +trans([], LogHeader, ResAST) -> + lists:reverse([header_fun(LogHeader) | ResAST]); +trans([{eof, L} | AST], LogHeader, ResAST) -> + lists:reverse([{eof, L}, header_fun(LogHeader) | ResAST]) ++ AST; +trans([{attribute, _, module, _Mod} = M | AST], Header, ResAST) -> + trans(AST, Header, [export_header_fun(), M | ResAST]); +trans([{attribute, _, logger_header, Header} | AST], _, ResAST) -> + trans(AST, Header, ResAST); +trans([F | AST], LogHeader, ResAST) -> + trans(AST, LogHeader, [F | ResAST]). + +export_header_fun() -> + {attribute,erl_anno:new(0),export,[{'$logger_header',0}]}. + +header_fun(LogHeader) -> + L = erl_anno:new(0), + {function,L,'$logger_header',0, + [{clause,L, + [], [], [{string,L,pad(LogHeader)}]}]}. + +pad("") -> ""; +pad(Str) -> Str ++ " ". From 70927482663df680a11fb7785d88e22aab10dbf1 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Sat, 15 Jun 2019 12:55:55 +0800 Subject: [PATCH 02/13] Improve log messages using logger header --- src/emqx.erl | 4 +++- src/emqx_alarm_handler.erl | 8 ++++--- src/emqx_banned.erl | 8 ++++--- src/emqx_bridge.erl | 16 ++++++++------ src/emqx_bridge_connect.erl | 4 +++- src/emqx_bridge_sup.erl | 4 +++- src/emqx_broker.erl | 16 ++++++++------ src/emqx_broker_helper.erl | 8 ++++--- src/emqx_channel.erl | 22 ++++++++++--------- src/emqx_client.erl | 28 ++++++++++++------------ src/emqx_cm.erl | 8 ++++--- src/emqx_ctl.erl | 12 ++++++----- src/emqx_hooks.erl | 8 ++++--- src/emqx_metrics.erl | 12 ++++++----- src/emqx_mod_acl_internal.erl | 4 +++- src/emqx_mod_presence.erl | 6 ++++-- src/emqx_modules.erl | 4 +++- src/emqx_mountpoint.erl | 4 +++- src/emqx_os_mon.erl | 4 +++- src/emqx_plugins.erl | 36 ++++++++++++++++--------------- src/emqx_pool.erl | 10 +++++---- src/emqx_protocol.erl | 22 ++++++++++--------- src/emqx_psk.erl | 6 ++++-- src/emqx_router.erl | 8 ++++--- src/emqx_router_helper.erl | 10 +++++---- src/emqx_session.erl | 40 ++++++++++++++++++----------------- src/emqx_session_sup.erl | 10 +++++---- src/emqx_shared_sub.erl | 10 +++++---- src/emqx_sm.erl | 12 ++++++----- src/emqx_sm_registry.erl | 8 ++++--- src/emqx_stats.erl | 14 ++++++------ src/emqx_sys.erl | 8 ++++--- src/emqx_sys_mon.erl | 20 ++++++++++-------- src/emqx_tracer.erl | 16 ++++++++------ src/emqx_ws_channel.erl | 30 ++++++++++++++------------ src/emqx_zone.erl | 8 ++++--- 36 files changed, 260 insertions(+), 188 deletions(-) diff --git a/src/emqx.erl b/src/emqx.erl index c126b262c..3c7622887 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -18,6 +18,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[EMQ X]"). + %% Start/Stop the application -export([ start/0 , restart/1 @@ -183,7 +185,7 @@ shutdown() -> shutdown(normal). shutdown(Reason) -> - ?LOG(critical, "[EMQ X] emqx shutdown for ~s", [Reason]), + ?LOG(critical, "emqx shutdown for ~s", [Reason]), emqx_alarm_handler:unload(), emqx_plugins:unload(), lists:foreach(fun application:stop/1, [emqx, ekka, cowboy, ranch, esockd, gproc]). diff --git a/src/emqx_alarm_handler.erl b/src/emqx_alarm_handler.erl index e7d74eee8..51e112779 100644 --- a/src/emqx_alarm_handler.erl +++ b/src/emqx_alarm_handler.erl @@ -19,6 +19,8 @@ -include("emqx.hrl"). -include("logger.hrl"). +-logger_header("[Alarm Handler]"). + %% Mnesia bootstrap -export([mnesia/1]). @@ -93,17 +95,17 @@ init(_) -> handle_event({set_alarm, {AlarmId, AlarmDesc = #alarm{timestamp = undefined}}}, State) -> handle_event({set_alarm, {AlarmId, AlarmDesc#alarm{timestamp = os:timestamp()}}}, State); handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) -> - ?LOG(warning, "[Alarm Handler] ~p set", [Alarm]), + ?LOG(warning, "~p set", [Alarm]), case encode_alarm(Alarm) of {ok, Json} -> emqx_broker:safe_publish(alarm_msg(topic(alert, maybe_to_binary(AlarmId)), Json)); {error, Reason} -> - ?LOG(error, "[Alarm Handler] Failed to encode alarm: ~p", [Reason]) + ?LOG(error, "Failed to encode alarm: ~p", [Reason]) end, set_alarm_(AlarmId, AlarmDesc), {ok, State}; handle_event({clear_alarm, AlarmId}, State) -> - ?LOG(notice, "[Alarm Handler] ~p clear", [AlarmId]), + ?LOG(notice, "~p clear", [AlarmId]), emqx_broker:safe_publish(alarm_msg(topic(clear, maybe_to_binary(AlarmId)), <<"">>)), clear_alarm_(AlarmId), {ok, State}; diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index 126562401..11378461c 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -20,6 +20,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Banned]"). + %% Mnesia bootstrap -export([mnesia/1]). @@ -88,11 +90,11 @@ init([]) -> {ok, ensure_expiry_timer(#{expiry_timer => undefined})}. handle_call(Req, _From, State) -> - ?LOG(error, "[Banned] unexpected call: ~p", [Req]), + ?LOG(error, "unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[Banned] unexpected msg: ~p", [Msg]), + ?LOG(error, "unexpected msg: ~p", [Msg]), {noreply, State}. handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> @@ -100,7 +102,7 @@ handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> {noreply, ensure_expiry_timer(State), hibernate}; handle_info(Info, State) -> - ?LOG(error, "[Banned] unexpected info: ~p", [Info]), + ?LOG(error, "unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #{expiry_timer := TRef}) -> diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index a01837659..5b5ab36bd 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -113,6 +113,8 @@ -include("logger.hrl"). -include("emqx_mqtt.hrl"). +-logger_header("[Bridge]"). + %% same as default in-flight limit for emqx_client -define(DEFAULT_BATCH_COUNT, 32). -define(DEFAULT_BATCH_BYTES, 1 bsl 20). @@ -304,7 +306,7 @@ standing_by({call, From}, ensure_started, State) -> standing_by(state_timeout, do_connect, State) -> {next_state, connecting, State}; standing_by(info, Info, State) -> - ?LOG(info, "[Bridge] Bridge ~p discarded info event at state standing_by:\n~p", [name(), Info]), + ?LOG(info, "Bridge ~p discarded info event at state standing_by:\n~p", [name(), Info]), {keep_state_and_data, State}; standing_by(Type, Content, State) -> common(standing_by, Type, Content, State). @@ -360,7 +362,7 @@ connected(info, {disconnected, ConnRef, Reason}, #{conn_ref := ConnRefCurrent} = State) -> case ConnRefCurrent =:= ConnRef of true -> - ?LOG(info, "[Bridge] Bridge ~p diconnected~nreason=~p", [name(), Reason]), + ?LOG(info, "Bridge ~p diconnected~nreason=~p", [name(), Reason]), {next_state, connecting, State#{conn_ref => undefined, connection => undefined}}; false -> @@ -372,7 +374,7 @@ connected(info, {batch_ack, Ref}, State) -> keep_state_and_data; bad_order -> %% try re-connect then re-send - ?LOG(error, "[Bridge] Bad order ack received by bridge ~p", [name()]), + ?LOG(error, "Bad order ack received by bridge ~p", [name()]), {next_state, connecting, disconnect(State)}; {ok, NewState} -> {keep_state, NewState, ?maybe_send} @@ -403,7 +405,7 @@ common(_StateName, info, {dispatch, _, Msg}, NewQ = replayq:append(Q, collect([Msg])), {keep_state, State#{replayq => NewQ}, ?maybe_send}; common(StateName, Type, Content, State) -> - ?LOG(notice, "[Bridge] Bridge ~p discarded ~p type event at state ~p:\n~p", + ?LOG(notice, "Bridge ~p discarded ~p type event at state ~p:\n~p", [name(), Type, StateName, Content]), {keep_state, State}. @@ -459,7 +461,7 @@ do_connect(Type, StateName, #{ forwards := Forwards end, case ConnectFun(Subs) of {ok, ConnRef, Conn} -> - ?LOG(info, "[Bridge] Bridge ~p connected", [name()]), + ?LOG(info, "Bridge ~p connected", [name()]), State0 = State#{conn_ref => ConnRef, connection => Conn}, State1 = eval_bridge_handler(State0, connected), StandingbyAction = {next_state, connected, State1, [{reply, From, ok}]}, @@ -515,7 +517,7 @@ retry_inflight(#{inflight := Inflight} = State, {ok, NewState} -> retry_inflight(NewState, T); {error, Reason} -> - ?LOG(error, "[Bridge] Inflight retry failed\n~p", [Reason]), + ?LOG(error, "Inflight retry failed\n~p", [Reason]), {error, State#{inflight := Inflight ++ Remain}} end. @@ -546,7 +548,7 @@ do_send(State = #{inflight := Inflight}, QAckRef, [_ | _] = Batch) -> batch => Batch}], {ok, State#{inflight := NewInflight}}; {error, Reason} -> - ?LOG(info, "[Bridge] Batch produce failed\n~p", [Reason]), + ?LOG(info, "Batch produce failed\n~p", [Reason]), {error, State} end. diff --git a/src/emqx_bridge_connect.erl b/src/emqx_bridge_connect.erl index 8685451ae..480129903 100644 --- a/src/emqx_bridge_connect.erl +++ b/src/emqx_bridge_connect.erl @@ -31,6 +31,8 @@ -include("logger.hrl"). +-logger_header("[Bridge Connect]"). + %% establish the connection to remote node/cluster %% protal worker (the caller process) should be expecting %% a message {disconnected, conn_ref()} when disconnected. @@ -54,7 +56,7 @@ start(Module, Config) -> {ok, Ref, Conn}; {error, Reason} -> Config1 = obfuscate(Config), - ?LOG(error, "[Bridge connect] Failed to connect with module=~p\n" + ?LOG(error, "Failed to connect with module=~p\n" "config=~p\nreason:~p", [Module, Config1, Reason]), {error, Reason} end. diff --git a/src/emqx_bridge_sup.erl b/src/emqx_bridge_sup.erl index a40e7b2e3..baf86fd70 100644 --- a/src/emqx_bridge_sup.erl +++ b/src/emqx_bridge_sup.erl @@ -17,6 +17,8 @@ -include("logger.hrl"). +-logger_header("[Bridge]"). + %% APIs -export([ start_link/0 , start_link/1 @@ -74,6 +76,6 @@ drop_bridge(Id) -> ok -> supervisor:delete_child(?SUP, Id); Error -> - ?LOG(error, "[Bridge] Delete bridge failed, error : ~p", [Error]), + ?LOG(error, "Delete bridge failed, error : ~p", [Error]), Error end. diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index cccc0bcd7..b9a4eb1bd 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -20,6 +20,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Broker]"). + -export([start_link/2]). %% PubSub @@ -195,7 +197,7 @@ publish(Msg) when is_record(Msg, message) -> Headers = Msg#message.headers, case emqx_hooks:run_fold('message.publish', [], Msg#message{headers = Headers#{allow_publish => true}}) of #message{headers = #{allow_publish := false}} -> - ?LOG(notice, "[Broker] Publishing interrupted: ~s", [emqx_message:format(Msg)]), + ?LOG(notice, "Publishing interrupted: ~s", [emqx_message:format(Msg)]), []; #message{topic = Topic} = Msg1 -> Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)), @@ -209,7 +211,7 @@ safe_publish(Msg) when is_record(Msg, message) -> publish(Msg) catch _:Error:Stacktrace -> - ?LOG(error, "[Broker] Publish error: ~p~n~p~n~p", [Error, Msg, Stacktrace]) + ?LOG(error, "Publish error: ~p~n~p~n~p", [Error, Msg, Stacktrace]) after ok end. @@ -256,7 +258,7 @@ forward(Node, To, Delivery) -> %% rpc:call to ensure the delivery, but the latency:( case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of {badrpc, Reason} -> - ?LOG(error, "[Broker] Failed to forward msg to ~s: ~p", [Node, Reason]), + ?LOG(error, "Failed to forward msg to ~s: ~p", [Node, Reason]), Delivery; Delivery1 -> Delivery1 end. @@ -424,14 +426,14 @@ handle_call({subscribe, Topic, I}, _From, State) -> {reply, Ok, State}; handle_call(Req, _From, State) -> - ?LOG(error, "[Broker] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({subscribe, Topic}, State) -> case emqx_router:do_add_route(Topic) of ok -> ok; {error, Reason} -> - ?LOG(error, "[Broker] Failed to add route: ~p", [Reason]) + ?LOG(error, "Failed to add route: ~p", [Reason]) end, {noreply, State}; @@ -454,11 +456,11 @@ handle_cast({unsubscribed, Topic, I}, State) -> {noreply, State}; handle_cast(Msg, State) -> - ?LOG(error, "[Broker] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "[Broker] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #{pool := Pool, id := Id}) -> diff --git a/src/emqx_broker_helper.erl b/src/emqx_broker_helper.erl index 8ece1805b..9e9b337cf 100644 --- a/src/emqx_broker_helper.erl +++ b/src/emqx_broker_helper.erl @@ -19,6 +19,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Broker Helper]"). + -export([start_link/0]). %% APIs @@ -110,7 +112,7 @@ init([]) -> {ok, #{pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> - ?LOG(error, "[Broker Helper] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) -> @@ -119,7 +121,7 @@ handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) -> {noreply, State#{pmon := emqx_pmon:monitor(SubPid, PMon)}}; handle_cast(Msg, State) -> - ?LOG(error, "[Broker Helper] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) -> @@ -130,7 +132,7 @@ handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) {noreply, State#{pmon := PMon1}}; handle_info(Info, State) -> - ?LOG(error, "[Broker Helper] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 7e7b7f4c1..2a16bb348 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -22,6 +22,8 @@ -include("emqx_mqtt.hrl"). -include("logger.hrl"). +-logger_header("[Channel]"). + -export([start_link/3]). %% APIs @@ -288,18 +290,18 @@ handle({call, From}, session, State = #state{proto_state = ProtoState}) -> reply(From, emqx_protocol:session(ProtoState), State); handle({call, From}, Req, State) -> - ?LOG(error, "[Channel] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), reply(From, ignored, State); %% Handle cast handle(cast, Msg, State) -> - ?LOG(error, "[Channel] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {keep_state, State}; %% Handle Incoming handle(info, {Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> Oct = iolist_size(Data), - ?LOG(debug, "[Channel] RECV ~p", [Data]), + ?LOG(debug, "RECV ~p", [Data]), emqx_pd:update_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), NState = ensure_stats_timer(maybe_gc({1, Oct}, State)), @@ -348,23 +350,23 @@ handle(info, {timeout, Timer, emit_stats}, GcState1 = emqx_gc:reset(GcState), {keep_state, NState#state{gc_state = GcState1}, hibernate}; {shutdown, Reason} -> - ?LOG(error, "[Channel] Shutdown exceptionally due to ~p", [Reason]), + ?LOG(error, "Shutdown exceptionally due to ~p", [Reason]), shutdown(Reason, NState) end; handle(info, {shutdown, discard, {ClientId, ByPid}}, State) -> - ?LOG(error, "[Channel] Discarded by ~s:~p", [ClientId, ByPid]), + ?LOG(error, "Discarded by ~s:~p", [ClientId, ByPid]), shutdown(discard, State); handle(info, {shutdown, conflict, {ClientId, NewPid}}, State) -> - ?LOG(warning, "[Channel] Clientid '~s' conflict with ~p", [ClientId, NewPid]), + ?LOG(warning, "Clientid '~s' conflict with ~p", [ClientId, NewPid]), shutdown(conflict, State); handle(info, {shutdown, Reason}, State) -> shutdown(Reason, State); handle(info, Info, State) -> - ?LOG(error, "[Channel] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {keep_state, State}. code_change(_Vsn, State, Data, _Extra) -> @@ -374,7 +376,7 @@ terminate(Reason, _StateName, #state{transport = Transport, socket = Socket, keepalive = KeepAlive, proto_state = ProtoState}) -> - ?LOG(debug, "[Channel] Terminated for ~p", [Reason]), + ?LOG(debug, "Terminated for ~p", [Reason]), Transport:fast_close(Socket), emqx_keepalive:cancel(KeepAlive), case {ProtoState, Reason} of @@ -403,7 +405,7 @@ process_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> shutdown(Reason, State) catch error:Reason:Stk -> - ?LOG(error, "[Channel] Parse failed for ~p~n\ + ?LOG(error, "Parse failed for ~p~n\ Stacktrace:~p~nError data:~p", [Reason, Stk, Data]), shutdown(parse_error, State) end. @@ -445,7 +447,7 @@ ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) -> {0, Rl1} -> ensure_rate_limit(Limiters, setelement(Pos, State, Rl1)); {Pause, Rl1} -> - ?LOG(debug, "[Channel] Rate limit pause connection ~pms", [Pause]), + ?LOG(debug, "Rate limit pause connection ~pms", [Pause]), TRef = erlang:send_after(Pause, self(), activate_socket), setelement(Pos, State#state{conn_state = blocked, limit_timer = TRef}, Rl1) end. diff --git a/src/emqx_client.erl b/src/emqx_client.erl index e6dbea27b..41ff2f72e 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -22,6 +22,8 @@ -include("types.hrl"). -include("emqx_client.hrl"). +-logger_header("[Client]"). + -export([ start_link/0 , start_link/1 ]). @@ -794,10 +796,10 @@ connected(cast, ?PUBREC_PACKET(PacketId), State = #state{inflight = Inflight}) - Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight), State#state{inflight = Inflight1}; {value, {pubrel, _Ref, _Ts}} -> - ?LOG(notice, "[Client] Duplicated PUBREC Packet: ~p", [PacketId]), + ?LOG(notice, "Duplicated PUBREC Packet: ~p", [PacketId]), State; none -> - ?LOG(warning, "[Client] Unexpected PUBREC Packet: ~p", [PacketId]), + ?LOG(warning, "Unexpected PUBREC Packet: ~p", [PacketId]), State end); @@ -812,7 +814,7 @@ connected(cast, ?PUBREL_PACKET(PacketId), false -> {keep_state, NewState} end; error -> - ?LOG(warning, "[Client] Unexpected PUBREL: ~p", [PacketId]), + ?LOG(warning, "Unexpected PUBREL: ~p", [PacketId]), keep_state_and_data end; @@ -911,37 +913,37 @@ handle_event({call, From}, stop, _StateName, _State) -> {stop_and_reply, normal, [{reply, From, ok}]}; handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State) when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl -> - ?LOG(debug, "[Client] RECV Data: ~p", [Data]), + ?LOG(debug, "RECV Data: ~p", [Data]), process_incoming(Data, [], run_sock(State)); handle_event(info, {Error, _Sock, Reason}, _StateName, State) when Error =:= tcp_error; Error =:= ssl_error -> - ?LOG(error, "[Client] The connection error occured ~p, reason:~p", [Error, Reason]), + ?LOG(error, "The connection error occured ~p, reason:~p", [Error, Reason]), {stop, {shutdown, Reason}, State}; handle_event(info, {Closed, _Sock}, _StateName, State) when Closed =:= tcp_closed; Closed =:= ssl_closed -> - ?LOG(debug, "[Client] ~p", [Closed]), + ?LOG(debug, "~p", [Closed]), {stop, {shutdown, Closed}, State}; handle_event(info, {'EXIT', Owner, Reason}, _, State = #state{owner = Owner}) -> - ?LOG(debug, "[Client] Got EXIT from owner, Reason: ~p", [Reason]), + ?LOG(debug, "Got EXIT from owner, Reason: ~p", [Reason]), {stop, {shutdown, Reason}, State}; handle_event(info, {inet_reply, _Sock, ok}, _, _State) -> keep_state_and_data; handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) -> - ?LOG(error, "[Client] Got tcp error: ~p", [Reason]), + ?LOG(error, "Got tcp error: ~p", [Reason]), {stop, {shutdown, Reason}, State}; handle_event(info, EventContent = {'EXIT', _Pid, normal}, StateName, _State) -> - ?LOG(info, "[Client] State: ~s, Unexpected Event: (info, ~p)", + ?LOG(info, "State: ~s, Unexpected Event: (info, ~p)", [StateName, EventContent]), keep_state_and_data; handle_event(EventType, EventContent, StateName, _StateData) -> - ?LOG(error, "[Client] State: ~s, Unexpected Event: (~p, ~p)", + ?LOG(error, "State: ~s, Unexpected Event: (~p, ~p)", [StateName, EventType, EventContent]), keep_state_and_data. @@ -984,7 +986,7 @@ delete_inflight(?PUBACK_PACKET(PacketId, ReasonCode, Properties), properties => Properties}), State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}; none -> - ?LOG(warning, "[Client] Unexpected PUBACK: ~p", [PacketId]), + ?LOG(warning, "Unexpected PUBACK: ~p", [PacketId]), State end; delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), @@ -996,7 +998,7 @@ delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), properties => Properties}), State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}; none -> - ?LOG(warning, "[Client] Unexpected PUBCOMP Packet: ~p", [PacketId]), + ?LOG(warning, "Unexpected PUBCOMP Packet: ~p", [PacketId]), State end. @@ -1200,7 +1202,7 @@ send(Msg, State) when is_record(Msg, mqtt_msg) -> send(Packet, State = #state{socket = Sock, proto_ver = Ver}) when is_record(Packet, mqtt_packet) -> Data = emqx_frame:serialize(Packet, #{version => Ver}), - ?LOG(debug, "[Client] SEND Data: ~1000p", [Packet]), + ?LOG(debug, "SEND Data: ~1000p", [Packet]), case emqx_client_sock:send(Sock, Data) of ok -> {ok, bump_last_packet_id(State)}; Error -> Error diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index d78e99465..852338854 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -20,6 +20,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[CM]"). + -export([start_link/0]). -export([ register_connection/1 @@ -159,7 +161,7 @@ init([]) -> {ok, #{conn_pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> - ?LOG(error, "[CM] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({notify, {registered, ClientId, ConnPid}}, State = #{conn_pmon := PMon}) -> @@ -169,7 +171,7 @@ handle_cast({notify, {unregistered, ConnPid}}, State = #{conn_pmon := PMon}) -> {noreply, State#{conn_pmon := emqx_pmon:demonitor(ConnPid, PMon)}}; handle_cast(Msg, State) -> - ?LOG(error, "[CM] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon}) -> @@ -180,7 +182,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon} {noreply, State#{conn_pmon := PMon1}}; handle_info(Info, State) -> - ?LOG(error, "[CM] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_ctl.erl b/src/emqx_ctl.erl index 4746175b3..09bab3e58 100644 --- a/src/emqx_ctl.erl +++ b/src/emqx_ctl.erl @@ -18,6 +18,8 @@ -include("logger.hrl"). +-logger_header("[Ctl]"). + -export([start_link/0]). -export([ register_command/2 @@ -79,7 +81,7 @@ run_command(Cmd, Args) when is_atom(Cmd) -> _ -> ok catch _:Reason:Stacktrace -> - ?ERROR("[Ctl] CMD Error:~p, Stacktrace:~p", [Reason, Stacktrace]), + ?ERROR("CMD Error:~p, Stacktrace:~p", [Reason, Stacktrace]), {error, Reason} end; [] -> @@ -107,14 +109,14 @@ init([]) -> {ok, #state{seq = 0}}. handle_call(Req, _From, State) -> - ?LOG(error, "[Ctl] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({register_command, Cmd, MF, Opts}, State = #state{seq = Seq}) -> case ets:match(?TAB, {{'$1', Cmd}, '_', '_'}) of [] -> ets:insert(?TAB, {{Seq, Cmd}, MF, Opts}); [[OriginSeq] | _] -> - ?LOG(warning, "[Ctl] CMD ~s is overidden by ~p", [Cmd, MF]), + ?LOG(warning, "CMD ~s is overidden by ~p", [Cmd, MF]), ets:insert(?TAB, {{OriginSeq, Cmd}, MF, Opts}) end, noreply(next_seq(State)); @@ -124,11 +126,11 @@ handle_cast({unregister_command, Cmd}, State) -> noreply(State); handle_cast(Msg, State) -> - ?LOG(error, "[Ctl] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), noreply(State). handle_info(Info, State) -> - ?LOG(error, "[Ctl] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), noreply(State). terminate(_Reason, _State) -> diff --git a/src/emqx_hooks.erl b/src/emqx_hooks.erl index 6d448d2f3..594aacbec 100644 --- a/src/emqx_hooks.erl +++ b/src/emqx_hooks.erl @@ -19,6 +19,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Hooks]"). + -export([start_link/0, stop/0]). %% Hooks API @@ -181,7 +183,7 @@ handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, Stat {reply, Reply, State}; handle_call(Req, _From, State) -> - ?LOG(error, "[Hooks] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({del, HookPoint, Action}, State) -> @@ -194,11 +196,11 @@ handle_cast({del, HookPoint, Action}, State) -> {noreply, State}; handle_cast(Msg, State) -> - ?LOG(error, "[Hooks] Unexpected msg: ~p", [Msg]), + ?LOG(error, "Unexpected msg: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "[Hooks] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index cf5939d38..fdf66a493 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -20,6 +20,8 @@ -include("types.hrl"). -include("emqx_mqtt.hrl"). +-logger_header("[Metrics]"). + -export([ start_link/0 , stop/0 ]). @@ -357,13 +359,13 @@ init([]) -> {ok, #state{next_idx = ?RESERVED_IDX + 1}, hibernate}. handle_call({create, Type, Name}, _From, State = #state{next_idx = ?MAX_SIZE}) -> - ?LOG(error, "[Metrics] Failed to create ~s:~s for index exceeded.", [Type, Name]), + ?LOG(error, "Failed to create ~s:~s for index exceeded.", [Type, Name]), {reply, {error, metric_index_exceeded}, State}; handle_call({create, Type, Name}, _From, State = #state{next_idx = NextIdx}) -> case ets:lookup(?TAB, Name) of [#metric{idx = Idx}] -> - ?LOG(warning, "[Metrics] ~s already exists.", [Name]), + ?LOG(warning, "~s already exists.", [Name]), {reply, {ok, Idx}, State}; [] -> Metric = #metric{name = Name, type = Type, idx = NextIdx}, @@ -372,15 +374,15 @@ handle_call({create, Type, Name}, _From, State = #state{next_idx = NextIdx}) -> end; handle_call(Req, _From, State) -> - ?LOG(error, "[Metrics] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[Metrics] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "[Metrics] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_mod_acl_internal.erl b/src/emqx_mod_acl_internal.erl index 953666f31..9c88c9ea2 100644 --- a/src/emqx_mod_acl_internal.erl +++ b/src/emqx_mod_acl_internal.erl @@ -19,6 +19,8 @@ -include("emqx.hrl"). -include("logger.hrl"). +-logger_header("[ACL_INTERNAL]"). + %% APIs -export([ all_rules/0 , check_acl/5 @@ -99,7 +101,7 @@ rules_from_file(AclFile) -> #{publish => [Rule || Rule <- Rules, filter(publish, Rule)], subscribe => [Rule || Rule <- Rules, filter(subscribe, Rule)]}; {error, Reason} -> - ?LOG(alert, "[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]), + ?LOG(alert, "Failed to read ~s: ~p", [AclFile, Reason]), #{} end. diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index 9789474d7..cab5076f5 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -19,6 +19,8 @@ -include("emqx.hrl"). -include("logger.hrl"). +-logger_header("[Presence]"). + %% APIs -export([ on_client_connected/4 , on_client_disconnected/3 @@ -54,7 +56,7 @@ on_client_connected(#{client_id := ClientId, {ok, Payload} -> emqx:publish(message(qos(Env), topic(connected, ClientId), Payload)); {error, Reason} -> - ?LOG(error, "[Presence] Encoding connected event error: ~p", [Reason]) + ?LOG(error, "Encoding connected event error: ~p", [Reason]) end. on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, Env) -> @@ -65,7 +67,7 @@ on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, E {ok, Payload} -> emqx_broker:publish(message(qos(Env), topic(disconnected, ClientId), Payload)); {error, Reason} -> - ?LOG(error, "[Presence] Encoding disconnected event error: ~p", [Reason]) + ?LOG(error, "Encoding disconnected event error: ~p", [Reason]) end. unload(_Env) -> diff --git a/src/emqx_modules.erl b/src/emqx_modules.erl index f9d74dc81..79e2c02b4 100644 --- a/src/emqx_modules.erl +++ b/src/emqx_modules.erl @@ -16,6 +16,8 @@ -include("logger.hrl"). +-logger_header("[Modules]"). + -export([ load/0 , unload/0 ]). @@ -26,7 +28,7 @@ load() -> lists:foreach( fun({Mod, Env}) -> ok = Mod:load(Env), - ?LOG(info, "[Modules] Load ~s module successfully.", [Mod]) + ?LOG(info, "Load ~s module successfully.", [Mod]) end, emqx_config:get_env(modules, [])). -spec(unload() -> ok). diff --git a/src/emqx_mountpoint.erl b/src/emqx_mountpoint.erl index b98348395..66b8da057 100644 --- a/src/emqx_mountpoint.erl +++ b/src/emqx_mountpoint.erl @@ -17,6 +17,8 @@ -include("emqx.hrl"). -include("logger.hrl"). +-logger_header("[Mountpoint]"). + -export([ mount/2 , unmount/2 ]). @@ -46,7 +48,7 @@ unmount(MountPoint, Msg = #message{topic = Topic}) -> {MountPoint, Topic1} -> Msg#message{topic = Topic1} catch _Error:Reason -> - ?LOG(error, "[Mountpoint] Unmount error : ~p", [Reason]), + ?LOG(error, "Unmount error : ~p", [Reason]), Msg end. diff --git a/src/emqx_os_mon.erl b/src/emqx_os_mon.erl index 2a8eb3ca3..2b58a3749 100644 --- a/src/emqx_os_mon.erl +++ b/src/emqx_os_mon.erl @@ -18,6 +18,8 @@ -include("logger.hrl"). +-logger_header("[OS Monitor]"). + -export([start_link/1]). %% gen_server callbacks @@ -132,7 +134,7 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer, 0 -> {noreply, State#{timer := undefined}}; {error, Reason} -> - ?LOG(error, "[OS Monitor] Failed to get cpu utilization: ~p", [Reason]), + ?LOG(error, "Failed to get cpu utilization: ~p", [Reason]), {noreply, ensure_check_timer(State)}; Busy when Busy / 100 >= CPUHighWatermark -> alarm_handler:set_alarm({cpu_high_watermark, Busy}), diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index 1c1185bf0..b88fce472 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -17,6 +17,8 @@ -include("emqx.hrl"). -include("logger.hrl"). +-logger_header("[Plugins]"). + -export([init/0]). -export([ load/0 @@ -86,7 +88,7 @@ load_expand_plugin(PluginDir) -> end, Modules), case filelib:wildcard(Ebin ++ "/*.app") of [App|_] -> application:load(list_to_atom(filename:basename(App, ".app"))); - _ -> ?LOG(alert, "[Plugins] Plugin not found."), + _ -> ?LOG(alert, "Plugin not found."), {error, load_app_fail} end. @@ -112,7 +114,7 @@ with_loaded_file(File, SuccFun) -> Names = filter_plugins(Names0), SuccFun(Names); {error, Error} -> - ?LOG(alert, "[Plugins] Failed to read: ~p, error: ~p", [File, Error]), + ?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error]), {error, Error} end. @@ -126,7 +128,7 @@ load_plugins(Names, Persistent) -> Plugins = list(), NotFound = Names -- names(Plugins), case NotFound of [] -> ok; - NotFound -> ?LOG(alert, "[Plugins] Cannot find plugins: ~p", [NotFound]) + NotFound -> ?LOG(alert, "Cannot find plugins: ~p", [NotFound]) end, NeedToLoad = Names -- NotFound -- names(started_app), [load_plugin(find_plugin(Name, Plugins), Persistent) || Name <- NeedToLoad]. @@ -171,12 +173,12 @@ plugin(AppName) -> load(PluginName) when is_atom(PluginName) -> case lists:member(PluginName, names(started_app)) of true -> - ?LOG(notice, "[Plugins] Plugin ~s is already started", [PluginName]), + ?LOG(notice, "Plugin ~s is already started", [PluginName]), {error, already_started}; false -> case find_plugin(PluginName) of false -> - ?LOG(alert, "[Plugins] Plugin ~s not found", [PluginName]), + ?LOG(alert, "Plugin ~s not found", [PluginName]), {error, not_found}; Plugin -> load_plugin(Plugin, true) @@ -204,12 +206,12 @@ load_app(App) -> start_app(App, SuccFun) -> case application:ensure_all_started(App) of {ok, Started} -> - ?LOG(info, "[Plugins] Started plugins: ~p", [Started]), - ?LOG(info, "[Plugins] Load plugin ~s successfully", [App]), + ?LOG(info, "Started plugins: ~p", [Started]), + ?LOG(info, "Load plugin ~s successfully", [App]), SuccFun(App), {ok, Started}; {error, {ErrApp, Reason}} -> - ?LOG(error, "[Plugins] Load plugin ~s failed, cannot start plugin ~s for ~p", [App, ErrApp, Reason]), + ?LOG(error, "Load plugin ~s failed, cannot start plugin ~s for ~p", [App, ErrApp, Reason]), {error, {ErrApp, Reason}} end. @@ -226,10 +228,10 @@ unload(PluginName) when is_atom(PluginName) -> {true, true} -> unload_plugin(PluginName, true); {false, _} -> - ?LOG(error, "[Plugins] Plugin ~s is not started", [PluginName]), + ?LOG(error, "Plugin ~s is not started", [PluginName]), {error, not_started}; {true, false} -> - ?LOG(error, "[Plugins] ~s is not a plugin, cannot unload it", [PluginName]), + ?LOG(error, "~s is not a plugin, cannot unload it", [PluginName]), {error, not_found} end. @@ -244,11 +246,11 @@ unload_plugin(App, Persistent) -> stop_app(App) -> case application:stop(App) of ok -> - ?LOG(info, "[Plugins] Stop plugin ~s successfully", [App]), ok; + ?LOG(info, "Stop plugin ~s successfully", [App]), ok; {error, {not_started, App}} -> - ?LOG(error, "[Plugins] Plugin ~s is not started", [App]), ok; + ?LOG(error, "Plugin ~s is not started", [App]), ok; {error, Reason} -> - ?LOG(error, "[Plugins] Stop plugin ~s error: ~p", [App]), {error, Reason} + ?LOG(error, "Stop plugin ~s error: ~p", [App]), {error, Reason} end. %%-------------------------------------------------------------------- @@ -276,7 +278,7 @@ plugin_loaded(Name, true) -> ignore end; {error, Error} -> - ?LOG(error, "[Plugins] Cannot read loaded plugins: ~p", [Error]) + ?LOG(error, "Cannot read loaded plugins: ~p", [Error]) end. plugin_unloaded(_Name, false) -> @@ -289,10 +291,10 @@ plugin_unloaded(Name, true) -> true -> write_loaded(lists:delete(Name, Names)); false -> - ?LOG(error, "[Plugins] Cannot find ~s in loaded_file", [Name]) + ?LOG(error, "Cannot find ~s in loaded_file", [Name]) end; {error, Error} -> - ?LOG(error, "[Plugins] Cannot read loaded_plugins: ~p", [Error]) + ?LOG(error, "Cannot read loaded_plugins: ~p", [Error]) end. read_loaded() -> @@ -311,6 +313,6 @@ write_loaded(AppNames) -> file:write(Fd, iolist_to_binary(io_lib:format("~p.~n", [Name]))) end, AppNames); {error, Error} -> - ?LOG(error, "[Plugins] Open File ~p Error: ~p", [File, Error]), + ?LOG(error, "Open File ~p Error: ~p", [File, Error]), {error, Error} end. diff --git a/src/emqx_pool.erl b/src/emqx_pool.erl index b78479f5c..5a6231ee0 100644 --- a/src/emqx_pool.erl +++ b/src/emqx_pool.erl @@ -19,6 +19,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Pool]"). + %% APIs -export([start_link/2]). @@ -97,22 +99,22 @@ handle_call({submit, Task}, _From, State) -> {reply, catch run(Task), State}; handle_call(Req, _From, State) -> - ?LOG(error, "[Pool] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({async_submit, Task}, State) -> try run(Task) catch _:Error:Stacktrace -> - ?LOG(error, "[Pool] Error: ~p, ~p", [Error, Stacktrace]) + ?LOG(error, "Error: ~p, ~p", [Error, Stacktrace]) end, {noreply, State}; handle_cast(Msg, State) -> - ?LOG(error, "[Pool] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "[Pool] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #{pool := Pool, id := Id}) -> diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index b6ca03d7f..6eb1bb233 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -20,6 +20,8 @@ -include("emqx_mqtt.hrl"). -include("logger.hrl"). +-logger_header("[Protocol]"). + -export([ info/1 , attrs/1 , attr/2 @@ -413,11 +415,11 @@ process(?CONNECT_PACKET( %% Success {?RC_SUCCESS, SP, PState4}; {error, Error} -> - ?LOG(error, "[Protocol] Failed to open session: ~p", [Error]), + ?LOG(error, "Failed to open session: ~p", [Error]), {?RC_UNSPECIFIED_ERROR, PState1#pstate{credentials = Credentials0}} end; {error, Reason} -> - ?LOG(warning, "[Protocol] Client ~s (Username: '~s') login failed for ~p", [NewClientId, Username, Reason]), + ?LOG(warning, "Client ~s (Username: '~s') login failed for ~p", [NewClientId, Username, Reason]), {emqx_reason_codes:connack_error(Reason), PState1#pstate{credentials = Credentials}} end; {error, ReasonCode} -> @@ -429,7 +431,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState = # ok -> do_publish(Packet, PState); {error, ReasonCode} -> - ?LOG(warning, "[Protocol] Cannot publish qos0 message to ~s for ~s", + ?LOG(warning, "Cannot publish qos0 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore), do_acl_deny_action(AclDenyAction, Packet, ReasonCode, PState) @@ -440,7 +442,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState = #p ok -> do_publish(Packet, PState); {error, ReasonCode} -> - ?LOG(warning, "[Protocol] Cannot publish qos1 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), + ?LOG(warning, "Cannot publish qos1 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), case deliver({puback, PacketId, ReasonCode}, PState) of {ok, PState1} -> AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore), @@ -454,7 +456,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState = #p ok -> do_publish(Packet, PState); {error, ReasonCode} -> - ?LOG(warning, "[Protocol] Cannot publish qos2 message to ~s for ~s", + ?LOG(warning, "Cannot publish qos2 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), case deliver({pubrec, PacketId, ReasonCode}, PState) of {ok, PState1} -> @@ -501,7 +503,7 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), ({Topic, #{rc := Code}}, {Topics, Codes}) -> {[Topic|Topics], [Code|Codes]} end, {[], []}, TopicFilters), - ?LOG(warning, "[Protocol] Cannot subscribe ~p for ~p", + ?LOG(warning, "Cannot subscribe ~p for ~p", [SubTopics, [emqx_reason_codes:text(R) || R <- ReasonCodes]]), case deliver({suback, PacketId, ReasonCodes}, PState) of {ok, PState1} -> @@ -844,7 +846,7 @@ check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, case do_acl_check(EnableAcl, publish, Credentials, WillTopic) of ok -> ok; Other -> - ?LOG(warning, "[Protocol] Cannot publish will message to ~p for acl denied", [WillTopic]), + ?LOG(warning, "Cannot publish will message to ~p for acl denied", [WillTopic]), Other end. @@ -898,9 +900,9 @@ check_sub_acl(TopicFilters, #pstate{zone = Zone, credentials = Credentials}) -> end, {ok, []}, TopicFilters). trace(recv, Packet) -> - ?LOG(debug, "[Protocol] RECV ~s", [emqx_packet:format(Packet)]); + ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]); trace(send, Packet) -> - ?LOG(debug, "[Protocol] SEND ~s", [emqx_packet:format(Packet)]). + ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]). inc_stats(recv, Type, PState = #pstate{recv_stats = Stats}) -> PState#pstate{recv_stats = inc_stats(Type, Stats)}; @@ -926,7 +928,7 @@ terminate(Reason, PState) when Reason =:= conflict; terminate(Reason, PState = #pstate{credentials = Credentials}) -> do_flapping_detect(disconnect, PState), - ?LOG(info, "[Protocol] Shutdown for ~p", [Reason]), + ?LOG(info, "Shutdown for ~p", [Reason]), ok = emqx_hooks:run('client.disconnected', [Credentials, Reason]). start_keepalive(0, _PState) -> diff --git a/src/emqx_psk.erl b/src/emqx_psk.erl index ae2f8935b..cb8835e78 100644 --- a/src/emqx_psk.erl +++ b/src/emqx_psk.erl @@ -16,6 +16,8 @@ -include("logger.hrl"). +-logger_header("[PSK]"). + %% SSL PSK Callbacks -export([lookup/3]). @@ -27,10 +29,10 @@ lookup(psk, ClientPSKID, _UserState) -> try emqx_hooks:run_fold('tls_handshake.psk_lookup', [ClientPSKID], not_found) of SharedSecret when is_binary(SharedSecret) -> {ok, SharedSecret}; Error -> - ?LOG(error, "[PSK] Look PSK for PSKID ~p error: ~p", [ClientPSKID, Error]), + ?LOG(error, "Look PSK for PSKID ~p error: ~p", [ClientPSKID, Error]), error catch Except:Error:Stacktrace -> - ?LOG(error, "[PSK] Lookup PSK failed, ~p: ~p", [{Except,Error}, Stacktrace]), + ?LOG(error, "Lookup PSK failed, ~p: ~p", [{Except,Error}, Stacktrace]), error end. \ No newline at end of file diff --git a/src/emqx_router.erl b/src/emqx_router.erl index 8eb65169c..aefe61667 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -21,6 +21,8 @@ -include("types.hrl"). -include_lib("ekka/include/ekka.hrl"). +-logger_header("[Router]"). + %% Mnesia bootstrap -export([mnesia/1]). @@ -198,15 +200,15 @@ handle_call({delete_route, Topic, Dest}, _From, State) -> {reply, Ok, State}; handle_call(Req, _From, State) -> - ?LOG(error, "[Router] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[Router] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "[Router] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #{pool := Pool, id := Id}) -> diff --git a/src/emqx_router_helper.erl b/src/emqx_router_helper.erl index f272b1236..aac228f66 100644 --- a/src/emqx_router_helper.erl +++ b/src/emqx_router_helper.erl @@ -20,6 +20,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Router Helper]"). + %% Mnesia bootstrap -export([mnesia/1]). @@ -103,11 +105,11 @@ init([]) -> {ok, #{nodes => Nodes}, hibernate}. handle_call(Req, _From, State) -> - ?LOG(error, "[Router Helper] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[Router Helper] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}}, State = #{nodes := Nodes}) -> @@ -123,7 +125,7 @@ handle_info({mnesia_table_event, {delete, {?ROUTING_NODE, _Node}, _}}, State) -> {noreply, State}; handle_info({mnesia_table_event, Event}, State) -> - ?LOG(error, "[Router Helper] Unexpected mnesia_table_event: ~p", [Event]), + ?LOG(error, "Unexpected mnesia_table_event: ~p", [Event]), {noreply, State}; handle_info({nodedown, Node}, State = #{nodes := Nodes}) -> @@ -141,7 +143,7 @@ handle_info({membership, _Event}, State) -> {noreply, State}; handle_info(Info, State) -> - ?LOG(error, "[Route Helper] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 85d2c1947..49454d8ac 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -47,6 +47,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Session]"). + -export([start_link/1]). -export([ info/1 @@ -399,11 +401,11 @@ handle_call(stats, _From, State) -> reply(stats(State), State); handle_call({discard, ByPid}, _From, State = #state{conn_pid = undefined}) -> - ?LOG(warning, "[Session] Discarded by ~p", [ByPid]), + ?LOG(warning, "Discarded by ~p", [ByPid]), {stop, {shutdown, discarded}, ok, State}; handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_pid = ConnPid}) -> - ?LOG(warning, "[Session] Conn ~p is discarded by ~p", [ConnPid, ByPid]), + ?LOG(warning, "Conn ~p is discarded by ~p", [ConnPid, ByPid]), ConnPid ! {shutdown, discard, {ClientId, ByPid}}, {stop, {shutdown, discarded}, ok, State}; @@ -423,7 +425,7 @@ handle_call({register_publish_packet_id, PacketId, Ts}, _From, {ok, ensure_stats_timer(ensure_await_rel_timer(State1))} end; true -> - ?LOG(warning, "[Session] Dropped qos2 packet ~w for too many awaiting_rel", [PacketId]), + ?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId]), ok = emqx_metrics:inc('messages.qos2.dropped'), {{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State} end); @@ -435,7 +437,7 @@ handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = In true -> {ok, ensure_stats_timer(acked(pubrec, PacketId, State))}; false -> - ?LOG(warning, "[Session] The PUBREC PacketId ~w is not found.", [PacketId]), + ?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId]), ok = emqx_metrics:inc('packets.pubrec.missed'), {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} end); @@ -447,7 +449,7 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel {_Ts, AwaitingRel1} -> {ok, ensure_stats_timer(State#state{awaiting_rel = AwaitingRel1})}; error -> - ?LOG(warning, "[Session] The PUBREL PacketId ~w is not found", [PacketId]), + ?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]), ok = emqx_metrics:inc('packets.pubrel.missed'), {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} end); @@ -456,7 +458,7 @@ handle_call(close, _From, State) -> {stop, normal, ok, State}; handle_call(Req, _From, State) -> - ?LOG(error, "[Session] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. %% SUBSCRIBE: @@ -497,7 +499,7 @@ handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight} true -> ensure_stats_timer(dequeue(acked(puback, PacketId, State))); false -> - ?LOG(warning, "[Session] The PUBACK PacketId ~w is not found", [PacketId]), + ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]), ok = emqx_metrics:inc('packets.puback.missed'), State end); @@ -509,7 +511,7 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight true -> ensure_stats_timer(dequeue(acked(pubcomp, PacketId, State))); false -> - ?LOG(warning, "[Session] The PUBCOMP PacketId ~w is not found", [PacketId]), + ?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]), ok = emqx_metrics:inc('packets.pubcomp.missed'), State end); @@ -527,14 +529,14 @@ handle_cast({resume, #{conn_pid := ConnPid, expiry_timer = ExpireTimer, will_delay_timer = WillDelayTimer}) -> - ?LOG(info, "[Session] Resumed by connection ~p ", [ConnPid]), + ?LOG(info, "Resumed by connection ~p ", [ConnPid]), %% Cancel Timers lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer, WillDelayTimer]), case kick(ClientId, OldConnPid, ConnPid) of - ok -> ?LOG(warning, "[Session] Connection ~p kickout ~p", [ConnPid, OldConnPid]); + ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid]); ignore -> ok end, @@ -565,7 +567,7 @@ handle_cast({update_expiry_interval, Interval}, State) -> {noreply, State#state{expiry_interval = Interval}}; handle_cast(Msg, State) -> - ?LOG(error, "[Session] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, message) -> @@ -600,12 +602,12 @@ handle_info({timeout, Timer, emit_stats}, GcState1 = emqx_gc:reset(GcState), {noreply, NewState#state{gc_state = GcState1}, hibernate}; {shutdown, Reason} -> - ?LOG(warning, "[Session] Shutdown exceptionally due to ~p", [Reason]), + ?LOG(warning, "Shutdown exceptionally due to ~p", [Reason]), shutdown(Reason, NewState) end; handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) -> - ?LOG(info, "[Session] Expired, shutdown now.", []), + ?LOG(info, "Expired, shutdown now.", []), shutdown(expired, State); handle_info({timeout, Timer, will_delay}, State = #state{will_msg = WillMsg, will_delay_timer = Timer}) -> @@ -640,12 +642,12 @@ handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) -> {noreply, State#state{old_conn_pid = undefined}}; handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) -> - ?LOG(error, "[Session] Unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p", + ?LOG(error, "Unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p", [ConnPid, Pid, Reason]), {noreply, State}; handle_info(Info, State) -> - ?LOG(error, "[Session] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(Reason, #state{will_msg = WillMsg, @@ -771,7 +773,7 @@ expire_awaiting_rel([{PacketId, Ts} | More], Now, case (timer:now_diff(Now, Ts) div 1000) of Age when Age >= Timeout -> ok = emqx_metrics:inc('messages.qos2.expired'), - ?LOG(warning, "[Session] Dropped qos2 packet ~s for await_rel_timeout", [PacketId]), + ?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId]), expire_awaiting_rel(More, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)}); Age -> ensure_await_rel_timer(Timeout - max(0, Age), State) @@ -981,7 +983,7 @@ acked(puback, PacketId, State = #state{client_id = ClientId, username = Username ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]), State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}; none -> - ?LOG(warning, "[Session] Duplicated PUBACK PacketId ~w", [PacketId]), + ?LOG(warning, "Duplicated PUBACK PacketId ~w", [PacketId]), State end; @@ -991,10 +993,10 @@ acked(pubrec, PacketId, State = #state{client_id = ClientId, username = Username ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]), State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)}; {value, {pubrel, PacketId, _Ts}} -> - ?LOG(warning, "[Session] Duplicated PUBREC PacketId ~w", [PacketId]), + ?LOG(warning, "Duplicated PUBREC PacketId ~w", [PacketId]), State; none -> - ?LOG(warning, "[Session] Unexpected PUBREC PacketId ~w", [PacketId]), + ?LOG(warning, "Unexpected PUBREC PacketId ~w", [PacketId]), State end; diff --git a/src/emqx_session_sup.erl b/src/emqx_session_sup.erl index ad721d5fd..b271bd85d 100644 --- a/src/emqx_session_sup.erl +++ b/src/emqx_session_sup.erl @@ -19,6 +19,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Session Supervisor]"). + -export([start_link/1]). -export([ start_session/1 @@ -92,7 +94,7 @@ handle_call({start_session, SessAttrs = #{client_id := ClientId}}, _From, reply({error, Reason}, State) catch _:Error:Stk -> - ?LOG(error, "[Session Supervisor] Failed to start session ~p: ~p, stacktrace:~n~p", + ?LOG(error, "Failed to start session ~p: ~p, stacktrace:~n~p", [ClientId, Error, Stk]), reply({error, Error}, State) end; @@ -101,11 +103,11 @@ handle_call(count_sessions, _From, State = #state{sessions = SessMap}) -> {reply, maps:size(SessMap), State}; handle_call(Req, _From, State) -> - ?LOG(error, "[Session Supervisor] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[Session Supervisor] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({'EXIT', Pid, _Reason}, State = #state{sessions = SessMap, clean_down = CleanDown}) -> @@ -117,7 +119,7 @@ handle_info({'EXIT', Pid, _Reason}, State = #state{sessions = SessMap, clean_dow {noreply, State#state{sessions = SessMap1}}; handle_info(Info, State) -> - ?LOG(notice, "[Session Supervisor] Unexpected info: ~p", [Info]), + ?LOG(notice, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, State) -> diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 48eba293a..3b0d48189 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -21,6 +21,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Shared Sub]"). + %% Mnesia bootstrap -export([mnesia/1]). @@ -310,11 +312,11 @@ handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) -> {reply, ok, State}; handle_call(Req, _From, State) -> - ?LOG(error, "[Shared Sub] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[Shared Sub] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) -> @@ -329,12 +331,12 @@ handle_info({mnesia_table_event, _Event}, State) -> {noreply, State}; handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) -> - ?LOG(info, "[Shared Sub] Shared subscriber down: ~p", [SubPid]), + ?LOG(info, "Shared subscriber down: ~p", [SubPid]), cleanup_down(SubPid), {noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})}; handle_info(Info, State) -> - ?LOG(error, "[Shared Sub] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index a122a05fe..256425f5a 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -20,6 +20,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[SM]"). + %% APIs -export([start_link/0]). @@ -114,7 +116,7 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) -> try emqx_session:discard(SessPid, ConnPid) catch _:Error:_Stk -> - ?LOG(warning, "[SM] Failed to discard ~p: ~p", [SessPid, Error]) + ?LOG(warning, "Failed to discard ~p: ~p", [SessPid, Error]) end end, lookup_session_pids(ClientId)). @@ -128,7 +130,7 @@ resume_session(ClientId, SessAttrs = #{conn_pid := ConnPid}) -> {ok, SessPid}; SessPids -> [SessPid|StalePids] = lists:reverse(SessPids), - ?LOG(error, "[SM] More than one session found: ~p", [SessPids]), + ?LOG(error, "More than one session found: ~p", [SessPids]), lists:foreach(fun(StalePid) -> catch emqx_session:discard(StalePid, ConnPid) end, StalePids), @@ -254,15 +256,15 @@ init([]) -> {ok, #{}}. handle_call(Req, _From, State) -> - ?LOG(error, "[SM] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[SM] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "[SM] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_sm_registry.erl b/src/emqx_sm_registry.erl index ccacc9907..5e8d4663d 100644 --- a/src/emqx_sm_registry.erl +++ b/src/emqx_sm_registry.erl @@ -20,6 +20,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Registry]"). + -export([start_link/0]). -export([ is_enabled/0 @@ -96,11 +98,11 @@ init([]) -> {ok, #{}}. handle_call(Req, _From, State) -> - ?LOG(error, "[Registry] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[Registry] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({membership, {mnesia, down, Node}}, State) -> @@ -114,7 +116,7 @@ handle_info({membership, _Event}, State) -> {noreply, State}; handle_info(Info, State) -> - ?LOG(error, "[Registry] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index c75bd742d..a095df8d8 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -20,6 +20,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Stats]"). + %% APIs -export([ start_link/0 , start_link/1 @@ -184,7 +186,7 @@ handle_call(stop, _From, State) -> {stop, normal, ok, State}; handle_call(Req, _From, State) -> - ?LOG(error, "[Stats] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({setstat, Stat, MaxStat, Val}, State) -> @@ -202,7 +204,7 @@ handle_cast({setstat, Stat, MaxStat, Val}, State) -> handle_cast({update_interval, Update = #update{name = Name}}, State = #state{updates = Updates}) -> case lists:keyfind(Name, #update.name, Updates) of #update{} -> - ?LOG(warning, "[Stats] Duplicated update: ~s", [Name]), + ?LOG(warning, "Duplicated update: ~s", [Name]), {noreply, State}; false -> {noreply, State#state{updates = [Update | Updates]}} @@ -212,7 +214,7 @@ handle_cast({cancel_update, Name}, State = #state{updates = Updates}) -> {noreply, State#state{updates = lists:keydelete(Name, #update.name, Updates)}}; handle_cast(Msg, State) -> - ?LOG(error, "[Stats] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Updates}) -> @@ -222,7 +224,7 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update try UpFun() catch _:Error -> - ?LOG(error, "[Stats] update ~s failed: ~p", [Name, Error]) + ?LOG(error, "update ~s failed: ~p", [Name, Error]) end, [Update#update{countdown = I} | Acc]; (Update = #update{countdown = C}, Acc) -> @@ -231,7 +233,7 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update {noreply, start_timer(State#state{updates = Updates1}), hibernate}; handle_info(Info, State) -> - ?LOG(error, "[Stats] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{timer = TRef}) -> @@ -252,5 +254,5 @@ safe_update_element(Key, Val) -> true catch error:badarg -> - ?LOG(warning, "[Stats] Update ~p to ~p failed", [Key, Val]) + ?LOG(warning, "Update ~p to ~p failed", [Key, Val]) end. diff --git a/src/emqx_sys.erl b/src/emqx_sys.erl index 90b24e122..37e22856f 100644 --- a/src/emqx_sys.erl +++ b/src/emqx_sys.erl @@ -19,6 +19,8 @@ -include("emqx.hrl"). -include("logger.hrl"). +-logger_header("[SYS]"). + -export([start_link/0]). -export([ version/0 @@ -117,11 +119,11 @@ handle_call(uptime, _From, State) -> {reply, uptime(State), State}; handle_call(Req, _From, State) -> - ?LOG(error, "[SYS] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[SYS] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) -> @@ -138,7 +140,7 @@ handle_info({timeout, TRef, tick}, State = #state{ticker = TRef, version = Versi {noreply, tick(State), hibernate}; handle_info(Info, State) -> - ?LOG(error, "[SYS] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) -> diff --git a/src/emqx_sys_mon.erl b/src/emqx_sys_mon.erl index b75503b53..f66c2a0c5 100644 --- a/src/emqx_sys_mon.erl +++ b/src/emqx_sys_mon.erl @@ -19,6 +19,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[SYSMON]"). + -export([start_link/1]). %% compress unused warning @@ -88,18 +90,18 @@ parse_opt([_Opt|Opts], Acc) -> parse_opt(Opts, Acc). handle_call(Req, _From, State) -> - ?LOG(error, "[SYSMON] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[SYSMON] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({monitor, Pid, long_gc, Info}, State) -> suppress({long_gc, Pid}, fun() -> WarnMsg = io_lib:format("long_gc warning: pid = ~p, info: ~p", [Pid, Info]), - ?LOG(warning, "[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]), + ?LOG(warning, "~s~n~p", [WarnMsg, procinfo(Pid)]), safe_publish(long_gc, WarnMsg) end, State); @@ -107,7 +109,7 @@ handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) -> suppress({long_schedule, Pid}, fun() -> WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]), - ?LOG(warning, "[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]), + ?LOG(warning, "~s~n~p", [WarnMsg, procinfo(Pid)]), safe_publish(long_schedule, WarnMsg) end, State); @@ -115,7 +117,7 @@ handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) -> suppress({long_schedule, Port}, fun() -> WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]), - ?LOG(warning, "[SYSMON] ~s~n~p", [WarnMsg, erlang:port_info(Port)]), + ?LOG(warning, "~s~n~p", [WarnMsg, erlang:port_info(Port)]), safe_publish(long_schedule, WarnMsg) end, State); @@ -123,7 +125,7 @@ handle_info({monitor, Pid, large_heap, Info}, State) -> suppress({large_heap, Pid}, fun() -> WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]), - ?LOG(warning, "[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]), + ?LOG(warning, "~s~n~p", [WarnMsg, procinfo(Pid)]), safe_publish(large_heap, WarnMsg) end, State); @@ -131,7 +133,7 @@ handle_info({monitor, SusPid, busy_port, Port}, State) -> suppress({busy_port, Port}, fun() -> WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]), - ?LOG(warning, "[SYSMON] ~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), + ?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), safe_publish(busy_port, WarnMsg) end, State); @@ -139,7 +141,7 @@ handle_info({monitor, SusPid, busy_dist_port, Port}, State) -> suppress({busy_dist_port, Port}, fun() -> WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]), - ?LOG(warning, "[SYSMON] ~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), + ?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), safe_publish(busy_dist_port, WarnMsg) end, State); @@ -147,7 +149,7 @@ handle_info({timeout, _Ref, reset}, State) -> {noreply, State#{events := []}, hibernate}; handle_info(Info, State) -> - ?LOG(error, "[SYSMON] Unexpected Info: ~p", [Info]), + ?LOG(error, "Unexpected Info: ~p", [Info]), {noreply, State}. terminate(_Reason, #{timer := TRef}) -> diff --git a/src/emqx_tracer.erl b/src/emqx_tracer.erl index c85aa5007..21562930b 100644 --- a/src/emqx_tracer.erl +++ b/src/emqx_tracer.erl @@ -19,6 +19,8 @@ -include("emqx.hrl"). -include("logger.hrl"). +-logger_header("[Tracer]"). + %% APIs -export([start_link/0]). @@ -121,10 +123,10 @@ handle_call({start_trace, Who, Level, LogFile}, _From, State = #state{traces = T filters => [{meta_key_filter, {fun filter_by_meta_key/2, Who} }]}) of ok -> - ?LOG(info, "[Tracer] Start trace for ~p", [Who]), + ?LOG(info, "Start trace for ~p", [Who]), {reply, ok, State#state{traces = maps:put(Who, {Level, LogFile}, Traces)}}; {error, Reason} -> - ?LOG(error, "[Tracer] Start trace for ~p failed, error: ~p", [Who, Reason]), + ?LOG(error, "Start trace for ~p failed, error: ~p", [Who, Reason]), {reply, {error, Reason}, State} end; @@ -133,9 +135,9 @@ handle_call({stop_trace, Who}, _From, State = #state{traces = Traces}) -> {ok, _LogFile} -> case logger:remove_handler(handler_id(Who)) of ok -> - ?LOG(info, "[Tracer] Stop trace for ~p", [Who]); + ?LOG(info, "Stop trace for ~p", [Who]); {error, Reason} -> - ?LOG(error, "[Tracer] Stop trace for ~p failed, error: ~p", [Who, Reason]) + ?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason]) end, {reply, ok, State#state{traces = maps:remove(Who, Traces)}}; error -> @@ -146,15 +148,15 @@ handle_call(lookup_traces, _From, State = #state{traces = Traces}) -> {reply, [{Who, LogFile} || {Who, LogFile} <- maps:to_list(Traces)], State}; handle_call(Req, _From, State) -> - ?LOG(error, "[Tracer] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[Tracer] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "[Tracer] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_ws_channel.erl b/src/emqx_ws_channel.erl index b20ad3478..a97cff326 100644 --- a/src/emqx_ws_channel.erl +++ b/src/emqx_ws_channel.erl @@ -20,6 +20,8 @@ -include("emqx_mqtt.hrl"). -include("logger.hrl"). +-logger_header("[WS Channel]"). + -export([ info/1 , attrs/1 , stats/1 @@ -143,11 +145,11 @@ websocket_init(#state{request = Req, options = Options}) -> WsCookie = try cowboy_req:parse_cookies(Req) catch error:badarg -> - ?LOG(error, "[WS Channel] Illegal cookie"), + ?LOG(error, "Illegal cookie"), undefined; Error:Reason -> ?LOG(error, - "[WS Channel] Cookie is parsed failed, Error: ~p, Reason ~p", + "Cookie is parsed failed, Error: ~p, Reason ~p", [Error, Reason]), undefined end, @@ -189,7 +191,7 @@ websocket_handle({binary, <<>>}, State) -> websocket_handle({binary, [<<>>]}, State) -> {ok, ensure_stats_timer(State)}; websocket_handle({binary, Data}, State = #state{parse_state = ParseState}) -> - ?LOG(debug, "[WS Channel] RECV ~p", [Data]), + ?LOG(debug, "RECV ~p", [Data]), BinSize = iolist_size(Data), emqx_pd:update_counter(recv_oct, BinSize), ok = emqx_metrics:inc('bytes.received', BinSize), @@ -204,11 +206,11 @@ websocket_handle({binary, Data}, State = #state{parse_state = ParseState}) -> end, State#state{parse_state = NParseState}); {error, Reason} -> - ?LOG(error, "[WS Channel] Frame error: ~p", [Reason]), + ?LOG(error, "Frame error: ~p", [Reason]), shutdown(Reason, State) catch error:Reason:Stk -> - ?LOG(error, "[WS Channel] Parse failed for ~p~n\ + ?LOG(error, "Parse failed for ~p~n\ Stacktrace:~p~nFrame data: ~p", [Reason, Stk, Data]), shutdown(parse_error, State) end; @@ -255,12 +257,12 @@ websocket_info({timeout, Timer, emit_stats}, {ok, State#state{stats_timer = undefined}, hibernate}; websocket_info({keepalive, start, Interval}, State) -> - ?LOG(debug, "[WS Channel] Keepalive at the interval of ~p", [Interval]), + ?LOG(debug, "Keepalive at the interval of ~p", [Interval]), case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of {ok, KeepAlive} -> {ok, State#state{keepalive = KeepAlive}}; {error, Error} -> - ?LOG(warning, "[WS Channel] Keepalive error: ~p", [Error]), + ?LOG(warning, "Keepalive error: ~p", [Error]), shutdown(Error, State) end; @@ -269,19 +271,19 @@ websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> {ok, KeepAlive1} -> {ok, State#state{keepalive = KeepAlive1}}; {error, timeout} -> - ?LOG(debug, "[WS Channel] Keepalive Timeout!"), + ?LOG(debug, "Keepalive Timeout!"), shutdown(keepalive_timeout, State); {error, Error} -> - ?LOG(error, "[WS Channel] Keepalive error: ~p", [Error]), + ?LOG(error, "Keepalive error: ~p", [Error]), shutdown(keepalive_error, State) end; websocket_info({shutdown, discard, {ClientId, ByPid}}, State) -> - ?LOG(warning, "[WS Channel] Discarded by ~s:~p", [ClientId, ByPid]), + ?LOG(warning, "Discarded by ~s:~p", [ClientId, ByPid]), shutdown(discard, State); websocket_info({shutdown, conflict, {ClientId, NewPid}}, State) -> - ?LOG(warning, "[WS Channel] Clientid '~s' conflict with ~p", [ClientId, NewPid]), + ?LOG(warning, "Clientid '~s' conflict with ~p", [ClientId, NewPid]), shutdown(conflict, State); websocket_info({binary, Data}, State) -> @@ -294,13 +296,13 @@ websocket_info({stop, Reason}, State) -> {stop, State#state{shutdown = Reason}}; websocket_info(Info, State) -> - ?LOG(error, "[WS Channel] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {ok, State}. terminate(SockError, _Req, #state{keepalive = Keepalive, proto_state = ProtoState, shutdown = Shutdown}) -> - ?LOG(debug, "[WS Channel] Terminated for ~p, sockerror: ~p", + ?LOG(debug, "Terminated for ~p, sockerror: ~p", [Shutdown, SockError]), emqx_keepalive:cancel(Keepalive), case {ProtoState, Shutdown} of @@ -320,7 +322,7 @@ handle_incoming(Packet, SuccFun, State = #state{proto_state = ProtoState}) -> {ok, NProtoState} -> SuccFun(State#state{proto_state = NProtoState}); {error, Reason} -> - ?LOG(error, "[WS Channel] Protocol error: ~p", [Reason]), + ?LOG(error, "Protocol error: ~p", [Reason]), shutdown(Reason, State); {error, Reason, NProtoState} -> shutdown(Reason, State#state{proto_state = NProtoState}); diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 65deeea24..d654487c5 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -20,6 +20,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Zone]"). + %% APIs -export([start_link/0]). @@ -96,7 +98,7 @@ handle_call(force_reload, _From, State) -> {reply, ok, State}; handle_call(Req, _From, State) -> - ?LOG(error, "[Zone] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({set_env, Zone, Key, Val}, State) -> @@ -104,11 +106,11 @@ handle_cast({set_env, Zone, Key, Val}, State) -> {noreply, State}; handle_cast(Msg, State) -> - ?LOG(error, "[Zone] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "[Zone] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> From 09dd65b9fa73200e68350f46a1c1c87987ffee92 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Sat, 15 Jun 2019 13:13:52 +0800 Subject: [PATCH 03/13] Add logger header validation --- src/emqx_logger.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/emqx_logger.erl b/src/emqx_logger.erl index 0a663de20..7f45843e3 100644 --- a/src/emqx_logger.erl +++ b/src/emqx_logger.erl @@ -173,6 +173,7 @@ trans([{eof, L} | AST], LogHeader, ResAST) -> trans([{attribute, _, module, _Mod} = M | AST], Header, ResAST) -> trans(AST, Header, [export_header_fun(), M | ResAST]); trans([{attribute, _, logger_header, Header} | AST], _, ResAST) -> + io_lib:printable_list(Header) orelse error({invalid_string, Header}), trans(AST, Header, ResAST); trans([F | AST], LogHeader, ResAST) -> trans(AST, LogHeader, [F | ResAST]). From ebeb2c554544e432ac12988a56f092111a197e9a Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Sat, 15 Jun 2019 15:00:49 +0800 Subject: [PATCH 04/13] Compile emqx_logger first --- rebar.config | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rebar.config b/rebar.config index 52bb8ead3..807c3cab1 100644 --- a/rebar.config +++ b/rebar.config @@ -24,6 +24,8 @@ {plugins, [coveralls]}. +{erl_first_files, ["src/emqx_logger.erl"]}. + {profiles, [{test, [{deps, From ffbb598a7348f4c156057bacce2f5a8956532a4e Mon Sep 17 00:00:00 2001 From: GilbertWong Date: Wed, 19 Jun 2019 16:09:59 +0800 Subject: [PATCH 05/13] Increase time precision for emqx_mod_presence --- src/emqx_mod_presence.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index cab5076f5..ef164102d 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -51,7 +51,7 @@ on_client_connected(#{client_id := ClientId, username => Username, ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)), connack => ConnAck, - ts => os:system_time(second) + ts => erlang:system_time(millisecond) }) of {ok, Payload} -> emqx:publish(message(qos(Env), topic(connected, ClientId), Payload)); @@ -63,7 +63,7 @@ on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, E case emqx_json:safe_encode([{clientid, ClientId}, {username, Username}, {reason, reason(Reason)}, - {ts, os:system_time(second)}]) of + {ts, erlang:system_time(millisecond)}]) of {ok, Payload} -> emqx_broker:publish(message(qos(Env), topic(disconnected, ClientId), Payload)); {error, Reason} -> From bcae452e42ec12382f36ccff80ec4ef9b0c0ad9e Mon Sep 17 00:00:00 2001 From: GilbertWong Date: Sat, 15 Jun 2019 17:07:46 +0800 Subject: [PATCH 06/13] Fix the flapping bug Prior to this change, the banned until value will not be set correctly because of wrong spell of config entry name . This change fix this bug --- rebar.config | 6 +++--- src/emqx_cm_sup.erl | 3 +-- src/emqx_flapping.erl | 13 ++++++++----- src/emqx_protocol.erl | 2 +- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/rebar.config b/rebar.config index 807c3cab1..bd017de97 100644 --- a/rebar.config +++ b/rebar.config @@ -2,10 +2,10 @@ [ {jsx, "2.9.0"} % hex , {cowboy, "2.6.1"} % hex , {gproc, "0.8.0"} % hex + , {ekka, "0.5.6"} % hex + , {replayq, "0.1.1"} %hex + , {esockd, "5.5.0"} %hex , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.5"}}} - , {replayq, {git, "https://github.com/emqx/replayq", {tag, "v0.1.1"}}} - , {esockd, "5.5.0"} , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} ]}. diff --git a/src/emqx_cm_sup.erl b/src/emqx_cm_sup.erl index 19940da05..e054c53e7 100644 --- a/src/emqx_cm_sup.erl +++ b/src/emqx_cm_sup.erl @@ -30,9 +30,8 @@ init([]) -> shutdown => 1000, type => worker, modules => [emqx_banned]}, - FlappingOption = emqx_config:get_env(flapping_clean_interval, 3600000), Flapping = #{id => flapping, - start => {emqx_flapping, start_link, [FlappingOption]}, + start => {emqx_flapping, start_link, []}, restart => permanent, shutdown => 1000, type => worker, diff --git a/src/emqx_flapping.erl b/src/emqx_flapping.erl index 099bf3910..2a5e66be1 100644 --- a/src/emqx_flapping.erl +++ b/src/emqx_flapping.erl @@ -19,7 +19,7 @@ -behaviour(gen_statem). --export([start_link/1]). +-export([start_link/0]). %% This module is used to garbage clean the flapping records @@ -33,6 +33,8 @@ -define(FLAPPING_TAB, ?MODULE). +-define(default_flapping_clean_interval, 3600000). + -export([check/3]). -record(flapping, @@ -96,11 +98,12 @@ check_flapping(Action, CheckCount, _Threshold = {TimesThreshold, TimeInterval}, %%-------------------------------------------------------------------- %% gen_statem callbacks %%-------------------------------------------------------------------- --spec(start_link(TimerInterval :: [integer()]) -> startlink_ret()). -start_link(TimerInterval) -> - gen_statem:start_link({local, ?MODULE}, ?MODULE, [TimerInterval], []). +-spec(start_link() -> startlink_ret()). +start_link() -> + gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []). -init([TimerInterval]) -> +init([]) -> + TimerInterval = emqx_config:get_env(flapping_clean_interval, ?default_flapping_clean_interval), TabOpts = [ public , set , {keypos, 2} diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 6eb1bb233..7d1ecc04e 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -963,7 +963,7 @@ do_flapping_detect(Action, #pstate{zone = Zone, Threshold = emqx_zone:get_env(Zone, flapping_threshold, {10, 60}), case emqx_flapping:check(Action, ClientId, Threshold) of flapping -> - BanExpiryInterval = emqx_zone:get_env(Zone, flapping_ban_expiry_interval, 3600000), + BanExpiryInterval = emqx_zone:get_env(Zone, flapping_banned_expiry_interval, 3600000), Until = erlang:system_time(second) + BanExpiryInterval, emqx_banned:add(#banned{who = {client_id, ClientId}, reason = <<"flapping">>, From 1d23d7de86d40e0e38df3ce0a118262bf1557578 Mon Sep 17 00:00:00 2001 From: Gilbert Date: Thu, 20 Jun 2019 11:44:44 +0800 Subject: [PATCH 07/13] Optimize test workflow (#2642) * Optimize test suites workflow * Better format of makefile * Fix protocol suite * Add emqx_access_SUITE --- Makefile | 17 ++++----- src/emqx_mqtt_caps.erl | 6 ++++ test/emqx_access_SUITE.erl | 54 ++++++---------------------- test/emqx_mqtt_packet_SUITE.erl | 8 ++--- test/emqx_protocol_SUITE.erl | 2 +- test/emqx_request_response_SUITE.erl | 5 +-- 6 files changed, 31 insertions(+), 61 deletions(-) diff --git a/Makefile b/Makefile index 771fb9d96..14790e0e9 100644 --- a/Makefile +++ b/Makefile @@ -3,20 +3,17 @@ REBAR_GIT_CLONE_OPTIONS += --depth 1 export REBAR_GIT_CLONE_OPTIONS -# CT_SUITES = emqx_trie emqx_router emqx_frame emqx_mqtt_compat -CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ - emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \ - emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \ - emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ - emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ - emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ - emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_channel \ - emqx_packet emqx_channel emqx_tracer emqx_sys_mon emqx_message emqx_os_mon \ - emqx_vm_mon emqx_alarm_handler emqx_rpc emqx_flapping +SUITES_FILES := $(shell find test -name '*_SUITE.erl') + +CT_SUITES := $(foreach value,$(SUITES_FILES),$(shell val=$$(basename $(value) .erl); echo $${val%_*})) CT_NODE_NAME = emqxct@127.0.0.1 +.PHONY: cover +run: + @echo $(CT_TEST_SUITES) + compile: @rebar3 compile diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index ff38f687e..bde3e3eb5 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -24,6 +24,8 @@ , get_caps/2 ]). +-export([default_caps/0]). + -type(caps() :: #{max_packet_size => integer(), max_clientid_len => integer(), max_topic_alias => integer(), @@ -36,6 +38,7 @@ -export_type([caps/0]). -define(UNLIMITED, 0). + -define(DEFAULT_CAPS, [{max_packet_size, ?MAX_PACKET_SIZE}, {max_clientid_len, ?MAX_CLIENTID_LEN}, {max_topic_alias, ?UNLIMITED}, @@ -119,6 +122,9 @@ check_sub(Topic, Opts, [{max_topic_levels, Limit}|Caps]) -> _ -> check_sub(Topic, Opts, Caps) end. +default_caps() -> + ?DEFAULT_CAPS. + get_caps(Zone, publish) -> with_env(Zone, '$mqtt_pub_caps', fun() -> diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index d57094925..9be887ce3 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -36,8 +36,6 @@ all() -> groups() -> [{access_control, [sequence], [reload_acl, - register_mod, - unregister_mod, check_acl_1, check_acl_2]}, {access_control_cache_mode, [], @@ -98,58 +96,26 @@ write_config(Filename, Terms) -> end_per_group(_Group, Config) -> Config. -init_per_testcase(_TestCase, Config) -> - ?AC:start_link(), - Config. -end_per_testcase(_TestCase, _Config) -> - ok. - -per_testcase_config(acl_cache_full, Config) -> - Config; -per_testcase_config(_TestCase, Config) -> - Config. - %%-------------------------------------------------------------------- %% emqx_access_control %%-------------------------------------------------------------------- reload_acl(_) -> - [ok] = ?AC:reload_acl(). - -register_mod(_) -> - ok = ?AC:register_mod(acl, emqx_acl_test_mod, []), - {emqx_acl_test_mod, _, 0} = hd(?AC:lookup_mods(acl)), - ok = ?AC:register_mod(auth, emqx_auth_anonymous_test_mod,[]), - ok = ?AC:register_mod(auth, emqx_auth_dashboard, [], 99), - [{emqx_auth_dashboard, _, 99}, - {emqx_auth_anonymous_test_mod, _, 0}] = ?AC:lookup_mods(auth). - -unregister_mod(_) -> - ok = ?AC:register_mod(acl, emqx_acl_test_mod, []), - {emqx_acl_test_mod, _, 0} = hd(?AC:lookup_mods(acl)), - ok = ?AC:unregister_mod(acl, emqx_acl_test_mod), - timer:sleep(5), - {emqx_acl_internal, _, 0}= hd(?AC:lookup_mods(acl)), - ok = ?AC:register_mod(auth, emqx_auth_anonymous_test_mod,[]), - [{emqx_auth_anonymous_test_mod, _, 0}] = ?AC:lookup_mods(auth), - - ok = ?AC:unregister_mod(auth, emqx_auth_anonymous_test_mod), - timer:sleep(5), - [] = ?AC:lookup_mods(auth). + ok = ?AC:reload_acl(). check_acl_1(_) -> - SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>}, + SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external}, allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), - deny = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1/x/y">>), + deny = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1/x/y">>), allow = ?AC:check_acl(SelfUser, publish, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>). check_acl_2(_) -> - SelfUser = #{client_id => <<"client2">>, username => <<"xyz">>}, + SelfUser = #{client_id => <<"client2">>, username => <<"xyz">>, zone => external}, deny = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>). acl_cache_basic(_) -> - SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>}, + SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external}, not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), @@ -162,7 +128,7 @@ acl_cache_basic(_) -> acl_cache_expiry(_) -> application:set_env(emqx, acl_cache_ttl, 100), - SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>}, + SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external}, allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), ct:sleep(150), @@ -172,7 +138,7 @@ acl_cache_expiry(_) -> acl_cache_full(_) -> application:set_env(emqx, acl_cache_max_size, 1), - SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>}, + SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external}, allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), @@ -187,7 +153,7 @@ acl_cache_cleanup(_) -> application:set_env(emqx, acl_cache_ttl, 100), application:set_env(emqx, acl_cache_max_size, 2), - SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>}, + SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external}, allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), @@ -357,8 +323,8 @@ compile_rule(_) -> {deny, all} = compile({deny, all}). match_rule(_) -> - User = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{127,0,0,1}, 2948}}, - User2 = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{192,168,0,10}, 3028}}, + User = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{127,0,0,1}, 2948}, zone => external}, + User2 = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{192,168,0,10}, 3028}, zone => external}, {matched, allow} = match(User, <<"Test/Topic">>, {allow, all}), {matched, deny} = match(User, <<"Test/Topic">>, {deny, all}), diff --git a/test/emqx_mqtt_packet_SUITE.erl b/test/emqx_mqtt_packet_SUITE.erl index 8ac76bfd9..e189ed69d 100644 --- a/test/emqx_mqtt_packet_SUITE.erl +++ b/test/emqx_mqtt_packet_SUITE.erl @@ -68,11 +68,11 @@ groups() -> [{connect, [sequence], ]}]. init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), + emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + emqx_ct_helpers:stop_apps([]). init_per_group(_Group, Config) -> Config. @@ -85,7 +85,7 @@ case1_protocol_name(_) -> MqttPacket = serialize(?CASE1_PROTOCOL_NAME), emqx_client_sock:send(Sock, MqttPacket), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), _} = raw_recv_pase(Data), + {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), <<>>, _} = raw_recv_pase(Data), Disconnect = gen_tcp:recv(Sock, 0), ?assertEqual({error, closed}, Disconnect). @@ -95,7 +95,7 @@ case2_protocol_ver(_) -> emqx_client_sock:send(Sock, Packet), {ok, Data} = gen_tcp:recv(Sock, 0), %% case1 Unacceptable protocol version - {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), _} = raw_recv_pase(Data), + {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), <<>>, _} = raw_recv_pase(Data), Disconnect = gen_tcp:recv(Sock, 0), ?assertEqual({error, closed}, Disconnect). diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 60d298008..ec707e30d 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -55,7 +55,7 @@ groups() -> init_per_suite(Config) -> emqx_ct_helpers:start_apps([], fun set_special_configs/1), - MqttCaps = emqx_zone:get_env(external, '$mqtt_caps'), + MqttCaps = maps:from_list(emqx_mqtt_caps:default_caps()), emqx_zone:set_env(external, '$mqtt_caps', MqttCaps#{max_topic_alias => 20}), Config. diff --git a/test/emqx_request_response_SUITE.erl b/test/emqx_request_response_SUITE.erl index 6709e958e..373619f1d 100644 --- a/test/emqx_request_response_SUITE.erl +++ b/test/emqx_request_response_SUITE.erl @@ -22,10 +22,11 @@ -include_lib("common_test/include/ct.hrl"). init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps([{log_level, error} | Config]). + emqx_ct_helpers:start_apps([]), + Config. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + emqx_ct_helpers:stop_apps([]). all() -> [request_response]. From 138d7727faa6392ade07d31a0c14c0a5c6765074 Mon Sep 17 00:00:00 2001 From: Gilbert Date: Thu, 20 Jun 2019 11:45:44 +0800 Subject: [PATCH 08/13] Compact windows platform which has no cpu_sup util function. (#2629) * Compact windows platform which has no cpu_sup util function. * Fix wrong os type clause * Do not start timer when platform is windows --- src/emqx_os_mon.erl | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/emqx_os_mon.erl b/src/emqx_os_mon.erl index 2b58a3749..21624c079 100644 --- a/src/emqx_os_mon.erl +++ b/src/emqx_os_mon.erl @@ -47,6 +47,11 @@ -define(OS_MON, ?MODULE). +-define(compat_windows(Expression), case os:type() of + {win32, nt} -> windows; + _Unix -> Expression + end). + %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ @@ -95,7 +100,7 @@ set_procmem_high_watermark(Float) -> %%------------------------------------------------------------------------------ init([Opts]) -> - _ = cpu_sup:util(), + _ = ?compat_windows(cpu_sup:util()), set_mem_check_interval(proplists:get_value(mem_check_interval, Opts, 60)), set_sysmem_high_watermark(proplists:get_value(sysmem_high_watermark, Opts, 0.70)), set_procmem_high_watermark(proplists:get_value(procmem_high_watermark, Opts, 0.05)), @@ -126,16 +131,18 @@ handle_call(_Request, _From, State) -> handle_cast(_Request, State) -> {noreply, State}. -handle_info({timeout, Timer, check}, State = #{timer := Timer, +handle_info({timeout, Timer, check}, State = #{timer := Timer, cpu_high_watermark := CPUHighWatermark, cpu_low_watermark := CPULowWatermark, is_cpu_alarm_set := IsCPUAlarmSet}) -> - case cpu_sup:util() of + case ?compat_windows(cpu_sup:util()) of 0 -> {noreply, State#{timer := undefined}}; {error, Reason} -> ?LOG(error, "Failed to get cpu utilization: ~p", [Reason]), {noreply, ensure_check_timer(State)}; + windows -> + {noreply, State}; Busy when Busy / 100 >= CPUHighWatermark -> alarm_handler:set_alarm({cpu_high_watermark, Busy}), {noreply, ensure_check_timer(State#{is_cpu_alarm_set := true})}; @@ -163,4 +170,3 @@ call(Req) -> ensure_check_timer(State = #{cpu_check_interval := Interval}) -> State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}. - From 37eef7b72ab698763eaa5739a6957f6aa14c6a9d Mon Sep 17 00:00:00 2001 From: tigercl Date: Fri, 21 Jun 2019 19:21:26 +0800 Subject: [PATCH 09/13] Add 'auth.mqtt.anonymous' metric (#2631) * Add 'auth.mqtt.anonymous' metric --- src/emqx_access_control.erl | 5 ++++- src/emqx_metrics.erl | 7 ++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index d6aab26d4..7d849f493 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -29,7 +29,10 @@ -> {ok, emqx_types:credentials()} | {error, term()}). authenticate(Credentials) -> case emqx_hooks:run_fold('client.authenticate', [], init_auth_result(Credentials)) of - #{auth_result := success} = NewCredentials -> + #{auth_result := success, anonymous := true} = NewCredentials -> + emqx_metrics:inc('auth.mqtt.anonymous'), + {ok, NewCredentials}; + #{auth_result := success} = NewCredentials -> {ok, NewCredentials}; NewCredentials -> {error, maps:get(auth_result, NewCredentials, unknown_error)} diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index fdf66a493..fc04ab597 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -132,6 +132,10 @@ {counter, 'messages.forward'} % Messages forward ]). +-define(MQTT_METRICS, [ + {counter, 'auth.mqtt.anonymous'} +]). + -record(state, {next_idx = 1}). -record(metric, {name, type, idx}). @@ -355,7 +359,7 @@ init([]) -> Metric = #metric{name = Name, type = Type, idx = reserved_idx(Name)}, true = ets:insert(?TAB, Metric), ok = counters:put(CRef, Idx, 0) - end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS), + end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?MQTT_METRICS), {ok, #state{next_idx = ?RESERVED_IDX + 1}, hibernate}. handle_call({create, Type, Name}, _From, State = #state{next_idx = ?MAX_SIZE}) -> @@ -446,4 +450,5 @@ reserved_idx('messages.retained') -> 48; reserved_idx('messages.dropped') -> 49; reserved_idx('messages.expired') -> 50; reserved_idx('messages.forward') -> 51; +reserved_idx('auth.mqtt.anonymous') -> 52; reserved_idx(_) -> undefined. From 0c7c4ee4170b2280550a18df04ad4bcc654e99d4 Mon Sep 17 00:00:00 2001 From: tigercl Date: Fri, 21 Jun 2019 19:51:58 +0800 Subject: [PATCH 10/13] Using fixed topic for system message of alarm (#2647) * Using fixed topic for system message of alarm --- src/emqx_alarm_handler.erl | 23 +++++++++++++++-------- test/emqx_alarm_handler_SUITE.erl | 4 ++-- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/emqx_alarm_handler.erl b/src/emqx_alarm_handler.erl index 51e112779..a7654e32c 100644 --- a/src/emqx_alarm_handler.erl +++ b/src/emqx_alarm_handler.erl @@ -98,7 +98,7 @@ handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) -> ?LOG(warning, "~p set", [Alarm]), case encode_alarm(Alarm) of {ok, Json} -> - emqx_broker:safe_publish(alarm_msg(topic(alert, maybe_to_binary(AlarmId)), Json)); + emqx_broker:safe_publish(alarm_msg(topic(alert), Json)); {error, Reason} -> ?LOG(error, "Failed to encode alarm: ~p", [Reason]) end, @@ -106,7 +106,12 @@ handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) -> {ok, State}; handle_event({clear_alarm, AlarmId}, State) -> ?LOG(notice, "~p clear", [AlarmId]), - emqx_broker:safe_publish(alarm_msg(topic(clear, maybe_to_binary(AlarmId)), <<"">>)), + case encode_alarm({AlarmId, undefined}) of + {ok, Json} -> + emqx_broker:safe_publish(alarm_msg(topic(clear), Json)); + {error, Reason} -> + ?LOG(error, "Failed to encode alarm: ~p", [Reason]) + end, clear_alarm_(AlarmId), {ok, State}; handle_event(_, State) -> @@ -142,19 +147,21 @@ encode_alarm({AlarmId, #alarm{severity = Severity, {title, iolist_to_binary(Title)}, {summary, iolist_to_binary(Summary)}, {ts, emqx_time:now_secs(Ts)}]}]); +encode_alarm({AlarmId, undefined}) -> + emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}]); encode_alarm({AlarmId, AlarmDesc}) -> - emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}, - {desc, maybe_to_binary(AlarmDesc)}]). + emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}, + {description, maybe_to_binary(AlarmDesc)}]). alarm_msg(Topic, Payload) -> Msg = emqx_message:make(?MODULE, Topic, Payload), emqx_message:set_headers(#{'Content-Type' => <<"application/json">>}, emqx_message:set_flag(sys, Msg)). -topic(alert, AlarmId) -> - emqx_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>); -topic(clear, AlarmId) -> - emqx_topic:systop(<<"alarms/", AlarmId/binary, "/clear">>). +topic(alert) -> + emqx_topic:systop(<<"alarms/alert">>); +topic(clear) -> + emqx_topic:systop(<<"alarms/clear">>). maybe_to_binary(Data) when is_binary(Data) -> Data; diff --git a/test/emqx_alarm_handler_SUITE.erl b/test/emqx_alarm_handler_SUITE.erl index 25a8303e6..1e7223320 100644 --- a/test/emqx_alarm_handler_SUITE.erl +++ b/test/emqx_alarm_handler_SUITE.erl @@ -62,8 +62,8 @@ t_alarm_handler(_) -> {ok, Data} = gen_tcp:recv(Sock, 0), {ok, ?CONNACK_PACKET(?RC_SUCCESS), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), - Topic1 = emqx_topic:systop(<<"alarms/alarm_for_test/alert">>), - Topic2 = emqx_topic:systop(<<"alarms/alarm_for_test/clear">>), + Topic1 = emqx_topic:systop(<<"alarms/alert">>), + Topic2 = emqx_topic:systop(<<"alarms/clear">>), SubOpts = #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0}, emqx_client_sock:send(Sock, raw_send_serialize( From 481458d8ec8eaf9ed11f34a2d3615e0b37bb4453 Mon Sep 17 00:00:00 2001 From: turtled Date: Fri, 21 Jun 2019 14:09:41 +0800 Subject: [PATCH 11/13] Add plugin type --- include/emqx.hrl | 3 ++- src/emqx_plugins.erl | 15 +++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/include/emqx.hrl b/include/emqx.hrl index fba079b91..e206ad51d 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -132,7 +132,8 @@ descr :: string(), vendor :: string(), active = false :: boolean(), - info :: map() + info :: map(), + type :: atom() }). %%-------------------------------------------------------------------- diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index b88fce472..cedb5c5b0 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -151,20 +151,20 @@ stop_plugins(Names) -> -spec(list() -> [emqx_types:plugin()]). list() -> StartedApps = names(started_app), - lists:map(fun({Name, _, _}) -> - Plugin = plugin(Name), + lists:map(fun({Name, _, [Type| _]}) -> + Plugin = plugin(Name, Type), case lists:member(Name, StartedApps) of true -> Plugin#plugin{active = true}; false -> Plugin end end, lists:sort(ekka_boot:all_module_attributes(emqx_plugin))). -plugin(AppName) -> +plugin(AppName, Type) -> case application:get_all_key(AppName) of {ok, Attrs} -> Ver = proplists:get_value(vsn, Attrs, "0"), Descr = proplists:get_value(description, Attrs, ""), - #plugin{name = AppName, version = Ver, descr = Descr}; + #plugin{name = AppName, version = Ver, descr = Descr, type = plugin_type(Type)}; undefined -> error({plugin_not_found, AppName}) end. @@ -316,3 +316,10 @@ write_loaded(AppNames) -> ?LOG(error, "Open File ~p Error: ~p", [File, Error]), {error, Error} end. + +plugin_type(auth) -> auth; +plugin_type(protocol) -> protocol; +plugin_type(backend) -> backend; +plugin_type(bridge) -> bridge; +plugin_type(_) -> feature. + From 20188f91890bc9382d989746cb9e5ae4f42fa957 Mon Sep 17 00:00:00 2001 From: Gilbert Date: Fri, 21 Jun 2019 20:52:27 +0800 Subject: [PATCH 12/13] Optimize develop workflow and support make run and code hot swapping (#2644) * Optimize develop workflow and support make run and code hot swapping --- .gitignore | 1 + Makefile | 36 ++++++++++++++++++++++++++++++++---- rebar.config | 22 +++++++++++----------- test/run_emqx.escript | 13 +++++++++++++ 4 files changed, 57 insertions(+), 15 deletions(-) create mode 100644 test/run_emqx.escript diff --git a/.gitignore b/.gitignore index 16369a576..aaec950d4 100644 --- a/.gitignore +++ b/.gitignore @@ -40,3 +40,4 @@ xrefr erlang.mk *.coverdata etc/emqx.conf.rendered +Mnesia.*/ diff --git a/Makefile b/Makefile index 14790e0e9..bf1da5bc5 100644 --- a/Makefile +++ b/Makefile @@ -3,16 +3,43 @@ REBAR_GIT_CLONE_OPTIONS += --depth 1 export REBAR_GIT_CLONE_OPTIONS - SUITES_FILES := $(shell find test -name '*_SUITE.erl') CT_SUITES := $(foreach value,$(SUITES_FILES),$(shell val=$$(basename $(value) .erl); echo $${val%_*})) CT_NODE_NAME = emqxct@127.0.0.1 -.PHONY: cover -run: - @echo $(CT_TEST_SUITES) +RUN_NODE_NAME = emqxdebug@127.0.0.1 + +.PHONY: run +run: run_setup + @rebar3 as test get-deps + @rebar3 as test auto --name $(RUN_NODE_NAME) --script test/run_emqx.escript + +.PHONY: run_setup +run_setup: + @erl -noshell -eval \ + "{ok, [[HOME]]} = init:get_argument(home), \ + FilePath = HOME ++ \"/.config/rebar3/rebar.config\", \ + case file:consult(FilePath) of \ + {ok, Term} -> \ + NewTerm = case lists:keyfind(plugins, 1, Term) of \ + false -> [{plugins, [rebar3_auto]} | Term]; \ + {plugins, OldPlugins} -> \ + NewPlugins0 = OldPlugins -- [rebar3_auto], \ + NewPlugins = [rebar3_auto | NewPlugins0], \ + lists:keyreplace(plugins, 1, Term, {plugins, NewPlugins}) \ + end, \ + ok = file:write_file(FilePath, [io_lib:format(\"~p.\n\", [I]) || I <- NewTerm]); \ + _ -> \ + NewTerm=[{plugins, [rebar3_auto]}], \ + ok = file:write_file(FilePath, [io_lib:format(\"~p.\n\", [I]) || I <- NewTerm]) \ + end, \ + halt(0)." + +.PHONY: shell +shell: + @rebar3 as test auto compile: @rebar3 compile @@ -89,5 +116,6 @@ gen-clean: .PHONY: distclean distclean: gen-clean + @rm -rf Mnesia.* @rm -rf _build cover deps logs log data @rm -f rebar.lock compile_commands.json cuttlefish erl_crash.dump diff --git a/rebar.config b/rebar.config index bd017de97..22fbccaf9 100644 --- a/rebar.config +++ b/rebar.config @@ -1,12 +1,12 @@ {deps, - [ {jsx, "2.9.0"} % hex - , {cowboy, "2.6.1"} % hex - , {gproc, "0.8.0"} % hex - , {ekka, "0.5.6"} % hex - , {replayq, "0.1.1"} %hex - , {esockd, "5.5.0"} %hex - , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}} - , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} + [{jsx, "2.9.0"}, % hex + {cowboy, "2.6.1"}, % hex + {gproc, "0.8.0"}, % hex + {ekka, "0.5.6"}, % hex + {replayq, "0.1.1"}, %hex + {esockd, "5.5.0"}, %hex + {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}}, + {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} ]}. {edoc_opts, [{preprocess, true}]}. @@ -29,9 +29,9 @@ {profiles, [{test, [{deps, - [ {meck, "0.8.13"} % hex - , {bbmustache, "1.7.0"} % hex - , {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "v1.1.1"}}} + [{meck, "0.8.13"}, % hex + {bbmustache, "1.7.0"}, % hex + {emqx_ct_helpers, "1.1.3"} % hex ]} ]} ]}. diff --git a/test/run_emqx.escript b/test/run_emqx.escript new file mode 100644 index 000000000..e8d1e2aae --- /dev/null +++ b/test/run_emqx.escript @@ -0,0 +1,13 @@ +#!/usr/bin/env escript + +main(_) -> + start(). + +start() -> + SpecEmqxConfig = fun(_) -> ok end, + start(SpecEmqxConfig). + +start(SpecEmqxConfig) -> + SchemaPath = filename:join(["priv", "emqx.schema"]), + ConfPath = filename:join(["etc", "emqx.conf"]), + emqx_ct_helpers:start_app(emqx, SchemaPath, ConfPath, SpecEmqxConfig). From 475cabde4d175f6288c58fbc4a1faf03d9af594e Mon Sep 17 00:00:00 2001 From: Gilbert Date: Fri, 21 Jun 2019 21:21:49 +0800 Subject: [PATCH 13/13] Fix issue#2619 (#2646) * Fix issue#2619 Prior to this change, websocket connection would not be disconnected when dataframe type is other frametype. However, in mqtt spec, it shoud be disconnected. This change fix this inconsistent behaviour with mqtt 5.0 --- src/emqx_ws_channel.erl | 6 +++++- test/emqx_ws_channel_SUITE.erl | 13 +++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/emqx_ws_channel.erl b/src/emqx_ws_channel.erl index a97cff326..771883be2 100644 --- a/src/emqx_ws_channel.erl +++ b/src/emqx_ws_channel.erl @@ -221,7 +221,11 @@ websocket_handle(Frame, State) {ok, ensure_stats_timer(State)}; websocket_handle({FrameType, _}, State) when FrameType =:= ping; FrameType =:= pong -> - {ok, ensure_stats_timer(State)}. + {ok, ensure_stats_timer(State)}; +%% According to mqtt spec[https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901285] +websocket_handle({_OtherFrameType, _}, State) -> + ?LOG(error, "Frame error: Other type of data frame"), + shutdown(other_frame_type, State). websocket_info({call, From, info}, State) -> gen_server:reply(From, info(State)), diff --git a/test/emqx_ws_channel_SUITE.erl b/test/emqx_ws_channel_SUITE.erl index 29f02e653..ada11a95a 100644 --- a/test/emqx_ws_channel_SUITE.erl +++ b/test/emqx_ws_channel_SUITE.erl @@ -31,6 +31,7 @@ all() -> [ t_ws_connect_api , t_ws_auth_failure + , t_ws_other_type_frame ]. init_per_suite(Config) -> @@ -71,6 +72,18 @@ t_ws_connect_api(_Config) -> {close, _} = rfc6455_client:close(WS), ok. +t_ws_other_type_frame(_Config) -> + WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), + {ok, _} = rfc6455_client:open(WS), + ok = rfc6455_client:send_binary(WS, raw_send_serialize(?CLIENT)), + {binary, Bin} = rfc6455_client:recv(WS), + Connack = ?CONNACK_PACKET(?CONNACK_ACCEPT), + {ok, Connack, <<>>, _} = raw_recv_pase(Bin), + rfc6455_client:send(WS, <<"testdata">>), + timer:sleep(1000), + ?assertEqual(undefined, erlang:process_info(WS)), + ok. + raw_send_serialize(Packet) -> emqx_frame:serialize(Packet).