diff --git a/.gitignore b/.gitignore index 16369a576..aaec950d4 100644 --- a/.gitignore +++ b/.gitignore @@ -40,3 +40,4 @@ xrefr erlang.mk *.coverdata etc/emqx.conf.rendered +Mnesia.*/ diff --git a/Makefile b/Makefile index 771fb9d96..bf1da5bc5 100644 --- a/Makefile +++ b/Makefile @@ -3,20 +3,44 @@ REBAR_GIT_CLONE_OPTIONS += --depth 1 export REBAR_GIT_CLONE_OPTIONS -# CT_SUITES = emqx_trie emqx_router emqx_frame emqx_mqtt_compat +SUITES_FILES := $(shell find test -name '*_SUITE.erl') -CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ - emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \ - emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \ - emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ - emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ - emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ - emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_channel \ - emqx_packet emqx_channel emqx_tracer emqx_sys_mon emqx_message emqx_os_mon \ - emqx_vm_mon emqx_alarm_handler emqx_rpc emqx_flapping +CT_SUITES := $(foreach value,$(SUITES_FILES),$(shell val=$$(basename $(value) .erl); echo $${val%_*})) CT_NODE_NAME = emqxct@127.0.0.1 +RUN_NODE_NAME = emqxdebug@127.0.0.1 + +.PHONY: run +run: run_setup + @rebar3 as test get-deps + @rebar3 as test auto --name $(RUN_NODE_NAME) --script test/run_emqx.escript + +.PHONY: run_setup +run_setup: + @erl -noshell -eval \ + "{ok, [[HOME]]} = init:get_argument(home), \ + FilePath = HOME ++ \"/.config/rebar3/rebar.config\", \ + case file:consult(FilePath) of \ + {ok, Term} -> \ + NewTerm = case lists:keyfind(plugins, 1, Term) of \ + false -> [{plugins, [rebar3_auto]} | Term]; \ + {plugins, OldPlugins} -> \ + NewPlugins0 = OldPlugins -- [rebar3_auto], \ + NewPlugins = [rebar3_auto | NewPlugins0], \ + lists:keyreplace(plugins, 1, Term, {plugins, NewPlugins}) \ + end, \ + ok = file:write_file(FilePath, [io_lib:format(\"~p.\n\", [I]) || I <- NewTerm]); \ + _ -> \ + NewTerm=[{plugins, [rebar3_auto]}], \ + ok = file:write_file(FilePath, [io_lib:format(\"~p.\n\", [I]) || I <- NewTerm]) \ + end, \ + halt(0)." + +.PHONY: shell +shell: + @rebar3 as test auto + compile: @rebar3 compile @@ -92,5 +116,6 @@ gen-clean: .PHONY: distclean distclean: gen-clean + @rm -rf Mnesia.* @rm -rf _build cover deps logs log data @rm -f rebar.lock compile_commands.json cuttlefish erl_crash.dump diff --git a/include/emqx.hrl b/include/emqx.hrl index fba079b91..e206ad51d 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -132,7 +132,8 @@ descr :: string(), vendor :: string(), active = false :: boolean(), - info :: map() + info :: map(), + type :: atom() }). %%-------------------------------------------------------------------- diff --git a/include/logger.hrl b/include/logger.hrl index 503b6cf21..1cf5facc6 100644 --- a/include/logger.hrl +++ b/include/logger.hrl @@ -14,6 +14,8 @@ %% debug | info | notice | warning | error | critical | alert | emergency +-compile({parse_transform, emqx_logger}). + -define(DEBUG(Format), ?LOG(debug, Format, [])). -define(DEBUG(Format, Args), ?LOG(debug, Format, Args)). @@ -39,5 +41,5 @@ -define(LOG(Level, Format, Args), begin - (logger:log(Level,#{},#{report_cb => fun(_) -> {(Format), (Args)} end})) + (logger:log(Level,#{},#{report_cb => fun(_) -> {'$logger_header'()++(Format), (Args)} end})) end). diff --git a/rebar.config b/rebar.config index 52bb8ead3..22fbccaf9 100644 --- a/rebar.config +++ b/rebar.config @@ -1,12 +1,12 @@ {deps, - [ {jsx, "2.9.0"} % hex - , {cowboy, "2.6.1"} % hex - , {gproc, "0.8.0"} % hex - , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.5"}}} - , {replayq, {git, "https://github.com/emqx/replayq", {tag, "v0.1.1"}}} - , {esockd, "5.5.0"} - , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} + [{jsx, "2.9.0"}, % hex + {cowboy, "2.6.1"}, % hex + {gproc, "0.8.0"}, % hex + {ekka, "0.5.6"}, % hex + {replayq, "0.1.1"}, %hex + {esockd, "5.5.0"}, %hex + {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}}, + {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} ]}. {edoc_opts, [{preprocess, true}]}. @@ -24,12 +24,14 @@ {plugins, [coveralls]}. +{erl_first_files, ["src/emqx_logger.erl"]}. + {profiles, [{test, [{deps, - [ {meck, "0.8.13"} % hex - , {bbmustache, "1.7.0"} % hex - , {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "v1.1.1"}}} + [{meck, "0.8.13"}, % hex + {bbmustache, "1.7.0"}, % hex + {emqx_ct_helpers, "1.1.3"} % hex ]} ]} ]}. diff --git a/src/emqx.erl b/src/emqx.erl index c126b262c..3c7622887 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -18,6 +18,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[EMQ X]"). + %% Start/Stop the application -export([ start/0 , restart/1 @@ -183,7 +185,7 @@ shutdown() -> shutdown(normal). shutdown(Reason) -> - ?LOG(critical, "[EMQ X] emqx shutdown for ~s", [Reason]), + ?LOG(critical, "emqx shutdown for ~s", [Reason]), emqx_alarm_handler:unload(), emqx_plugins:unload(), lists:foreach(fun application:stop/1, [emqx, ekka, cowboy, ranch, esockd, gproc]). diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index d6aab26d4..7d849f493 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -29,7 +29,10 @@ -> {ok, emqx_types:credentials()} | {error, term()}). authenticate(Credentials) -> case emqx_hooks:run_fold('client.authenticate', [], init_auth_result(Credentials)) of - #{auth_result := success} = NewCredentials -> + #{auth_result := success, anonymous := true} = NewCredentials -> + emqx_metrics:inc('auth.mqtt.anonymous'), + {ok, NewCredentials}; + #{auth_result := success} = NewCredentials -> {ok, NewCredentials}; NewCredentials -> {error, maps:get(auth_result, NewCredentials, unknown_error)} diff --git a/src/emqx_alarm_handler.erl b/src/emqx_alarm_handler.erl index e7d74eee8..a7654e32c 100644 --- a/src/emqx_alarm_handler.erl +++ b/src/emqx_alarm_handler.erl @@ -19,6 +19,8 @@ -include("emqx.hrl"). -include("logger.hrl"). +-logger_header("[Alarm Handler]"). + %% Mnesia bootstrap -export([mnesia/1]). @@ -93,18 +95,23 @@ init(_) -> handle_event({set_alarm, {AlarmId, AlarmDesc = #alarm{timestamp = undefined}}}, State) -> handle_event({set_alarm, {AlarmId, AlarmDesc#alarm{timestamp = os:timestamp()}}}, State); handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) -> - ?LOG(warning, "[Alarm Handler] ~p set", [Alarm]), + ?LOG(warning, "~p set", [Alarm]), case encode_alarm(Alarm) of {ok, Json} -> - emqx_broker:safe_publish(alarm_msg(topic(alert, maybe_to_binary(AlarmId)), Json)); + emqx_broker:safe_publish(alarm_msg(topic(alert), Json)); {error, Reason} -> - ?LOG(error, "[Alarm Handler] Failed to encode alarm: ~p", [Reason]) + ?LOG(error, "Failed to encode alarm: ~p", [Reason]) end, set_alarm_(AlarmId, AlarmDesc), {ok, State}; handle_event({clear_alarm, AlarmId}, State) -> - ?LOG(notice, "[Alarm Handler] ~p clear", [AlarmId]), - emqx_broker:safe_publish(alarm_msg(topic(clear, maybe_to_binary(AlarmId)), <<"">>)), + ?LOG(notice, "~p clear", [AlarmId]), + case encode_alarm({AlarmId, undefined}) of + {ok, Json} -> + emqx_broker:safe_publish(alarm_msg(topic(clear), Json)); + {error, Reason} -> + ?LOG(error, "Failed to encode alarm: ~p", [Reason]) + end, clear_alarm_(AlarmId), {ok, State}; handle_event(_, State) -> @@ -140,19 +147,21 @@ encode_alarm({AlarmId, #alarm{severity = Severity, {title, iolist_to_binary(Title)}, {summary, iolist_to_binary(Summary)}, {ts, emqx_time:now_secs(Ts)}]}]); +encode_alarm({AlarmId, undefined}) -> + emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}]); encode_alarm({AlarmId, AlarmDesc}) -> - emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}, - {desc, maybe_to_binary(AlarmDesc)}]). + emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}, + {description, maybe_to_binary(AlarmDesc)}]). alarm_msg(Topic, Payload) -> Msg = emqx_message:make(?MODULE, Topic, Payload), emqx_message:set_headers(#{'Content-Type' => <<"application/json">>}, emqx_message:set_flag(sys, Msg)). -topic(alert, AlarmId) -> - emqx_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>); -topic(clear, AlarmId) -> - emqx_topic:systop(<<"alarms/", AlarmId/binary, "/clear">>). +topic(alert) -> + emqx_topic:systop(<<"alarms/alert">>); +topic(clear) -> + emqx_topic:systop(<<"alarms/clear">>). maybe_to_binary(Data) when is_binary(Data) -> Data; diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index 126562401..11378461c 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -20,6 +20,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Banned]"). + %% Mnesia bootstrap -export([mnesia/1]). @@ -88,11 +90,11 @@ init([]) -> {ok, ensure_expiry_timer(#{expiry_timer => undefined})}. handle_call(Req, _From, State) -> - ?LOG(error, "[Banned] unexpected call: ~p", [Req]), + ?LOG(error, "unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[Banned] unexpected msg: ~p", [Msg]), + ?LOG(error, "unexpected msg: ~p", [Msg]), {noreply, State}. handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> @@ -100,7 +102,7 @@ handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> {noreply, ensure_expiry_timer(State), hibernate}; handle_info(Info, State) -> - ?LOG(error, "[Banned] unexpected info: ~p", [Info]), + ?LOG(error, "unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #{expiry_timer := TRef}) -> diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index a01837659..5b5ab36bd 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -113,6 +113,8 @@ -include("logger.hrl"). -include("emqx_mqtt.hrl"). +-logger_header("[Bridge]"). + %% same as default in-flight limit for emqx_client -define(DEFAULT_BATCH_COUNT, 32). -define(DEFAULT_BATCH_BYTES, 1 bsl 20). @@ -304,7 +306,7 @@ standing_by({call, From}, ensure_started, State) -> standing_by(state_timeout, do_connect, State) -> {next_state, connecting, State}; standing_by(info, Info, State) -> - ?LOG(info, "[Bridge] Bridge ~p discarded info event at state standing_by:\n~p", [name(), Info]), + ?LOG(info, "Bridge ~p discarded info event at state standing_by:\n~p", [name(), Info]), {keep_state_and_data, State}; standing_by(Type, Content, State) -> common(standing_by, Type, Content, State). @@ -360,7 +362,7 @@ connected(info, {disconnected, ConnRef, Reason}, #{conn_ref := ConnRefCurrent} = State) -> case ConnRefCurrent =:= ConnRef of true -> - ?LOG(info, "[Bridge] Bridge ~p diconnected~nreason=~p", [name(), Reason]), + ?LOG(info, "Bridge ~p diconnected~nreason=~p", [name(), Reason]), {next_state, connecting, State#{conn_ref => undefined, connection => undefined}}; false -> @@ -372,7 +374,7 @@ connected(info, {batch_ack, Ref}, State) -> keep_state_and_data; bad_order -> %% try re-connect then re-send - ?LOG(error, "[Bridge] Bad order ack received by bridge ~p", [name()]), + ?LOG(error, "Bad order ack received by bridge ~p", [name()]), {next_state, connecting, disconnect(State)}; {ok, NewState} -> {keep_state, NewState, ?maybe_send} @@ -403,7 +405,7 @@ common(_StateName, info, {dispatch, _, Msg}, NewQ = replayq:append(Q, collect([Msg])), {keep_state, State#{replayq => NewQ}, ?maybe_send}; common(StateName, Type, Content, State) -> - ?LOG(notice, "[Bridge] Bridge ~p discarded ~p type event at state ~p:\n~p", + ?LOG(notice, "Bridge ~p discarded ~p type event at state ~p:\n~p", [name(), Type, StateName, Content]), {keep_state, State}. @@ -459,7 +461,7 @@ do_connect(Type, StateName, #{ forwards := Forwards end, case ConnectFun(Subs) of {ok, ConnRef, Conn} -> - ?LOG(info, "[Bridge] Bridge ~p connected", [name()]), + ?LOG(info, "Bridge ~p connected", [name()]), State0 = State#{conn_ref => ConnRef, connection => Conn}, State1 = eval_bridge_handler(State0, connected), StandingbyAction = {next_state, connected, State1, [{reply, From, ok}]}, @@ -515,7 +517,7 @@ retry_inflight(#{inflight := Inflight} = State, {ok, NewState} -> retry_inflight(NewState, T); {error, Reason} -> - ?LOG(error, "[Bridge] Inflight retry failed\n~p", [Reason]), + ?LOG(error, "Inflight retry failed\n~p", [Reason]), {error, State#{inflight := Inflight ++ Remain}} end. @@ -546,7 +548,7 @@ do_send(State = #{inflight := Inflight}, QAckRef, [_ | _] = Batch) -> batch => Batch}], {ok, State#{inflight := NewInflight}}; {error, Reason} -> - ?LOG(info, "[Bridge] Batch produce failed\n~p", [Reason]), + ?LOG(info, "Batch produce failed\n~p", [Reason]), {error, State} end. diff --git a/src/emqx_bridge_connect.erl b/src/emqx_bridge_connect.erl index 8685451ae..480129903 100644 --- a/src/emqx_bridge_connect.erl +++ b/src/emqx_bridge_connect.erl @@ -31,6 +31,8 @@ -include("logger.hrl"). +-logger_header("[Bridge Connect]"). + %% establish the connection to remote node/cluster %% protal worker (the caller process) should be expecting %% a message {disconnected, conn_ref()} when disconnected. @@ -54,7 +56,7 @@ start(Module, Config) -> {ok, Ref, Conn}; {error, Reason} -> Config1 = obfuscate(Config), - ?LOG(error, "[Bridge connect] Failed to connect with module=~p\n" + ?LOG(error, "Failed to connect with module=~p\n" "config=~p\nreason:~p", [Module, Config1, Reason]), {error, Reason} end. diff --git a/src/emqx_bridge_sup.erl b/src/emqx_bridge_sup.erl index a40e7b2e3..baf86fd70 100644 --- a/src/emqx_bridge_sup.erl +++ b/src/emqx_bridge_sup.erl @@ -17,6 +17,8 @@ -include("logger.hrl"). +-logger_header("[Bridge]"). + %% APIs -export([ start_link/0 , start_link/1 @@ -74,6 +76,6 @@ drop_bridge(Id) -> ok -> supervisor:delete_child(?SUP, Id); Error -> - ?LOG(error, "[Bridge] Delete bridge failed, error : ~p", [Error]), + ?LOG(error, "Delete bridge failed, error : ~p", [Error]), Error end. diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index cccc0bcd7..b9a4eb1bd 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -20,6 +20,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Broker]"). + -export([start_link/2]). %% PubSub @@ -195,7 +197,7 @@ publish(Msg) when is_record(Msg, message) -> Headers = Msg#message.headers, case emqx_hooks:run_fold('message.publish', [], Msg#message{headers = Headers#{allow_publish => true}}) of #message{headers = #{allow_publish := false}} -> - ?LOG(notice, "[Broker] Publishing interrupted: ~s", [emqx_message:format(Msg)]), + ?LOG(notice, "Publishing interrupted: ~s", [emqx_message:format(Msg)]), []; #message{topic = Topic} = Msg1 -> Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)), @@ -209,7 +211,7 @@ safe_publish(Msg) when is_record(Msg, message) -> publish(Msg) catch _:Error:Stacktrace -> - ?LOG(error, "[Broker] Publish error: ~p~n~p~n~p", [Error, Msg, Stacktrace]) + ?LOG(error, "Publish error: ~p~n~p~n~p", [Error, Msg, Stacktrace]) after ok end. @@ -256,7 +258,7 @@ forward(Node, To, Delivery) -> %% rpc:call to ensure the delivery, but the latency:( case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of {badrpc, Reason} -> - ?LOG(error, "[Broker] Failed to forward msg to ~s: ~p", [Node, Reason]), + ?LOG(error, "Failed to forward msg to ~s: ~p", [Node, Reason]), Delivery; Delivery1 -> Delivery1 end. @@ -424,14 +426,14 @@ handle_call({subscribe, Topic, I}, _From, State) -> {reply, Ok, State}; handle_call(Req, _From, State) -> - ?LOG(error, "[Broker] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({subscribe, Topic}, State) -> case emqx_router:do_add_route(Topic) of ok -> ok; {error, Reason} -> - ?LOG(error, "[Broker] Failed to add route: ~p", [Reason]) + ?LOG(error, "Failed to add route: ~p", [Reason]) end, {noreply, State}; @@ -454,11 +456,11 @@ handle_cast({unsubscribed, Topic, I}, State) -> {noreply, State}; handle_cast(Msg, State) -> - ?LOG(error, "[Broker] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "[Broker] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #{pool := Pool, id := Id}) -> diff --git a/src/emqx_broker_helper.erl b/src/emqx_broker_helper.erl index 8ece1805b..9e9b337cf 100644 --- a/src/emqx_broker_helper.erl +++ b/src/emqx_broker_helper.erl @@ -19,6 +19,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Broker Helper]"). + -export([start_link/0]). %% APIs @@ -110,7 +112,7 @@ init([]) -> {ok, #{pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> - ?LOG(error, "[Broker Helper] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) -> @@ -119,7 +121,7 @@ handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) -> {noreply, State#{pmon := emqx_pmon:monitor(SubPid, PMon)}}; handle_cast(Msg, State) -> - ?LOG(error, "[Broker Helper] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) -> @@ -130,7 +132,7 @@ handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) {noreply, State#{pmon := PMon1}}; handle_info(Info, State) -> - ?LOG(error, "[Broker Helper] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 7e7b7f4c1..2a16bb348 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -22,6 +22,8 @@ -include("emqx_mqtt.hrl"). -include("logger.hrl"). +-logger_header("[Channel]"). + -export([start_link/3]). %% APIs @@ -288,18 +290,18 @@ handle({call, From}, session, State = #state{proto_state = ProtoState}) -> reply(From, emqx_protocol:session(ProtoState), State); handle({call, From}, Req, State) -> - ?LOG(error, "[Channel] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), reply(From, ignored, State); %% Handle cast handle(cast, Msg, State) -> - ?LOG(error, "[Channel] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {keep_state, State}; %% Handle Incoming handle(info, {Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> Oct = iolist_size(Data), - ?LOG(debug, "[Channel] RECV ~p", [Data]), + ?LOG(debug, "RECV ~p", [Data]), emqx_pd:update_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), NState = ensure_stats_timer(maybe_gc({1, Oct}, State)), @@ -348,23 +350,23 @@ handle(info, {timeout, Timer, emit_stats}, GcState1 = emqx_gc:reset(GcState), {keep_state, NState#state{gc_state = GcState1}, hibernate}; {shutdown, Reason} -> - ?LOG(error, "[Channel] Shutdown exceptionally due to ~p", [Reason]), + ?LOG(error, "Shutdown exceptionally due to ~p", [Reason]), shutdown(Reason, NState) end; handle(info, {shutdown, discard, {ClientId, ByPid}}, State) -> - ?LOG(error, "[Channel] Discarded by ~s:~p", [ClientId, ByPid]), + ?LOG(error, "Discarded by ~s:~p", [ClientId, ByPid]), shutdown(discard, State); handle(info, {shutdown, conflict, {ClientId, NewPid}}, State) -> - ?LOG(warning, "[Channel] Clientid '~s' conflict with ~p", [ClientId, NewPid]), + ?LOG(warning, "Clientid '~s' conflict with ~p", [ClientId, NewPid]), shutdown(conflict, State); handle(info, {shutdown, Reason}, State) -> shutdown(Reason, State); handle(info, Info, State) -> - ?LOG(error, "[Channel] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {keep_state, State}. code_change(_Vsn, State, Data, _Extra) -> @@ -374,7 +376,7 @@ terminate(Reason, _StateName, #state{transport = Transport, socket = Socket, keepalive = KeepAlive, proto_state = ProtoState}) -> - ?LOG(debug, "[Channel] Terminated for ~p", [Reason]), + ?LOG(debug, "Terminated for ~p", [Reason]), Transport:fast_close(Socket), emqx_keepalive:cancel(KeepAlive), case {ProtoState, Reason} of @@ -403,7 +405,7 @@ process_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> shutdown(Reason, State) catch error:Reason:Stk -> - ?LOG(error, "[Channel] Parse failed for ~p~n\ + ?LOG(error, "Parse failed for ~p~n\ Stacktrace:~p~nError data:~p", [Reason, Stk, Data]), shutdown(parse_error, State) end. @@ -445,7 +447,7 @@ ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) -> {0, Rl1} -> ensure_rate_limit(Limiters, setelement(Pos, State, Rl1)); {Pause, Rl1} -> - ?LOG(debug, "[Channel] Rate limit pause connection ~pms", [Pause]), + ?LOG(debug, "Rate limit pause connection ~pms", [Pause]), TRef = erlang:send_after(Pause, self(), activate_socket), setelement(Pos, State#state{conn_state = blocked, limit_timer = TRef}, Rl1) end. diff --git a/src/emqx_client.erl b/src/emqx_client.erl index e6dbea27b..41ff2f72e 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -22,6 +22,8 @@ -include("types.hrl"). -include("emqx_client.hrl"). +-logger_header("[Client]"). + -export([ start_link/0 , start_link/1 ]). @@ -794,10 +796,10 @@ connected(cast, ?PUBREC_PACKET(PacketId), State = #state{inflight = Inflight}) - Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight), State#state{inflight = Inflight1}; {value, {pubrel, _Ref, _Ts}} -> - ?LOG(notice, "[Client] Duplicated PUBREC Packet: ~p", [PacketId]), + ?LOG(notice, "Duplicated PUBREC Packet: ~p", [PacketId]), State; none -> - ?LOG(warning, "[Client] Unexpected PUBREC Packet: ~p", [PacketId]), + ?LOG(warning, "Unexpected PUBREC Packet: ~p", [PacketId]), State end); @@ -812,7 +814,7 @@ connected(cast, ?PUBREL_PACKET(PacketId), false -> {keep_state, NewState} end; error -> - ?LOG(warning, "[Client] Unexpected PUBREL: ~p", [PacketId]), + ?LOG(warning, "Unexpected PUBREL: ~p", [PacketId]), keep_state_and_data end; @@ -911,37 +913,37 @@ handle_event({call, From}, stop, _StateName, _State) -> {stop_and_reply, normal, [{reply, From, ok}]}; handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State) when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl -> - ?LOG(debug, "[Client] RECV Data: ~p", [Data]), + ?LOG(debug, "RECV Data: ~p", [Data]), process_incoming(Data, [], run_sock(State)); handle_event(info, {Error, _Sock, Reason}, _StateName, State) when Error =:= tcp_error; Error =:= ssl_error -> - ?LOG(error, "[Client] The connection error occured ~p, reason:~p", [Error, Reason]), + ?LOG(error, "The connection error occured ~p, reason:~p", [Error, Reason]), {stop, {shutdown, Reason}, State}; handle_event(info, {Closed, _Sock}, _StateName, State) when Closed =:= tcp_closed; Closed =:= ssl_closed -> - ?LOG(debug, "[Client] ~p", [Closed]), + ?LOG(debug, "~p", [Closed]), {stop, {shutdown, Closed}, State}; handle_event(info, {'EXIT', Owner, Reason}, _, State = #state{owner = Owner}) -> - ?LOG(debug, "[Client] Got EXIT from owner, Reason: ~p", [Reason]), + ?LOG(debug, "Got EXIT from owner, Reason: ~p", [Reason]), {stop, {shutdown, Reason}, State}; handle_event(info, {inet_reply, _Sock, ok}, _, _State) -> keep_state_and_data; handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) -> - ?LOG(error, "[Client] Got tcp error: ~p", [Reason]), + ?LOG(error, "Got tcp error: ~p", [Reason]), {stop, {shutdown, Reason}, State}; handle_event(info, EventContent = {'EXIT', _Pid, normal}, StateName, _State) -> - ?LOG(info, "[Client] State: ~s, Unexpected Event: (info, ~p)", + ?LOG(info, "State: ~s, Unexpected Event: (info, ~p)", [StateName, EventContent]), keep_state_and_data; handle_event(EventType, EventContent, StateName, _StateData) -> - ?LOG(error, "[Client] State: ~s, Unexpected Event: (~p, ~p)", + ?LOG(error, "State: ~s, Unexpected Event: (~p, ~p)", [StateName, EventType, EventContent]), keep_state_and_data. @@ -984,7 +986,7 @@ delete_inflight(?PUBACK_PACKET(PacketId, ReasonCode, Properties), properties => Properties}), State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}; none -> - ?LOG(warning, "[Client] Unexpected PUBACK: ~p", [PacketId]), + ?LOG(warning, "Unexpected PUBACK: ~p", [PacketId]), State end; delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), @@ -996,7 +998,7 @@ delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), properties => Properties}), State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}; none -> - ?LOG(warning, "[Client] Unexpected PUBCOMP Packet: ~p", [PacketId]), + ?LOG(warning, "Unexpected PUBCOMP Packet: ~p", [PacketId]), State end. @@ -1200,7 +1202,7 @@ send(Msg, State) when is_record(Msg, mqtt_msg) -> send(Packet, State = #state{socket = Sock, proto_ver = Ver}) when is_record(Packet, mqtt_packet) -> Data = emqx_frame:serialize(Packet, #{version => Ver}), - ?LOG(debug, "[Client] SEND Data: ~1000p", [Packet]), + ?LOG(debug, "SEND Data: ~1000p", [Packet]), case emqx_client_sock:send(Sock, Data) of ok -> {ok, bump_last_packet_id(State)}; Error -> Error diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index d78e99465..852338854 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -20,6 +20,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[CM]"). + -export([start_link/0]). -export([ register_connection/1 @@ -159,7 +161,7 @@ init([]) -> {ok, #{conn_pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> - ?LOG(error, "[CM] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({notify, {registered, ClientId, ConnPid}}, State = #{conn_pmon := PMon}) -> @@ -169,7 +171,7 @@ handle_cast({notify, {unregistered, ConnPid}}, State = #{conn_pmon := PMon}) -> {noreply, State#{conn_pmon := emqx_pmon:demonitor(ConnPid, PMon)}}; handle_cast(Msg, State) -> - ?LOG(error, "[CM] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon}) -> @@ -180,7 +182,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon} {noreply, State#{conn_pmon := PMon1}}; handle_info(Info, State) -> - ?LOG(error, "[CM] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_cm_sup.erl b/src/emqx_cm_sup.erl index 19940da05..e054c53e7 100644 --- a/src/emqx_cm_sup.erl +++ b/src/emqx_cm_sup.erl @@ -30,9 +30,8 @@ init([]) -> shutdown => 1000, type => worker, modules => [emqx_banned]}, - FlappingOption = emqx_config:get_env(flapping_clean_interval, 3600000), Flapping = #{id => flapping, - start => {emqx_flapping, start_link, [FlappingOption]}, + start => {emqx_flapping, start_link, []}, restart => permanent, shutdown => 1000, type => worker, diff --git a/src/emqx_ctl.erl b/src/emqx_ctl.erl index 4746175b3..09bab3e58 100644 --- a/src/emqx_ctl.erl +++ b/src/emqx_ctl.erl @@ -18,6 +18,8 @@ -include("logger.hrl"). +-logger_header("[Ctl]"). + -export([start_link/0]). -export([ register_command/2 @@ -79,7 +81,7 @@ run_command(Cmd, Args) when is_atom(Cmd) -> _ -> ok catch _:Reason:Stacktrace -> - ?ERROR("[Ctl] CMD Error:~p, Stacktrace:~p", [Reason, Stacktrace]), + ?ERROR("CMD Error:~p, Stacktrace:~p", [Reason, Stacktrace]), {error, Reason} end; [] -> @@ -107,14 +109,14 @@ init([]) -> {ok, #state{seq = 0}}. handle_call(Req, _From, State) -> - ?LOG(error, "[Ctl] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({register_command, Cmd, MF, Opts}, State = #state{seq = Seq}) -> case ets:match(?TAB, {{'$1', Cmd}, '_', '_'}) of [] -> ets:insert(?TAB, {{Seq, Cmd}, MF, Opts}); [[OriginSeq] | _] -> - ?LOG(warning, "[Ctl] CMD ~s is overidden by ~p", [Cmd, MF]), + ?LOG(warning, "CMD ~s is overidden by ~p", [Cmd, MF]), ets:insert(?TAB, {{OriginSeq, Cmd}, MF, Opts}) end, noreply(next_seq(State)); @@ -124,11 +126,11 @@ handle_cast({unregister_command, Cmd}, State) -> noreply(State); handle_cast(Msg, State) -> - ?LOG(error, "[Ctl] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), noreply(State). handle_info(Info, State) -> - ?LOG(error, "[Ctl] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), noreply(State). terminate(_Reason, _State) -> diff --git a/src/emqx_flapping.erl b/src/emqx_flapping.erl index 099bf3910..2a5e66be1 100644 --- a/src/emqx_flapping.erl +++ b/src/emqx_flapping.erl @@ -19,7 +19,7 @@ -behaviour(gen_statem). --export([start_link/1]). +-export([start_link/0]). %% This module is used to garbage clean the flapping records @@ -33,6 +33,8 @@ -define(FLAPPING_TAB, ?MODULE). +-define(default_flapping_clean_interval, 3600000). + -export([check/3]). -record(flapping, @@ -96,11 +98,12 @@ check_flapping(Action, CheckCount, _Threshold = {TimesThreshold, TimeInterval}, %%-------------------------------------------------------------------- %% gen_statem callbacks %%-------------------------------------------------------------------- --spec(start_link(TimerInterval :: [integer()]) -> startlink_ret()). -start_link(TimerInterval) -> - gen_statem:start_link({local, ?MODULE}, ?MODULE, [TimerInterval], []). +-spec(start_link() -> startlink_ret()). +start_link() -> + gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []). -init([TimerInterval]) -> +init([]) -> + TimerInterval = emqx_config:get_env(flapping_clean_interval, ?default_flapping_clean_interval), TabOpts = [ public , set , {keypos, 2} diff --git a/src/emqx_hooks.erl b/src/emqx_hooks.erl index 6d448d2f3..594aacbec 100644 --- a/src/emqx_hooks.erl +++ b/src/emqx_hooks.erl @@ -19,6 +19,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Hooks]"). + -export([start_link/0, stop/0]). %% Hooks API @@ -181,7 +183,7 @@ handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, Stat {reply, Reply, State}; handle_call(Req, _From, State) -> - ?LOG(error, "[Hooks] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({del, HookPoint, Action}, State) -> @@ -194,11 +196,11 @@ handle_cast({del, HookPoint, Action}, State) -> {noreply, State}; handle_cast(Msg, State) -> - ?LOG(error, "[Hooks] Unexpected msg: ~p", [Msg]), + ?LOG(error, "Unexpected msg: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "[Hooks] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_logger.erl b/src/emqx_logger.erl index b77fa2d70..7f45843e3 100644 --- a/src/emqx_logger.erl +++ b/src/emqx_logger.erl @@ -48,6 +48,8 @@ , get_log_handler/1 ]). +-export([parse_transform/2]). + %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ @@ -120,6 +122,9 @@ set_log_level(Level) -> {error, Error} -> {error, {primary_logger_level, Error}} end. +parse_transform(AST, _Opts) -> + trans(AST, "", []). + %%------------------------------------------------------------------------------ %% Internal Functions %%------------------------------------------------------------------------------ @@ -160,3 +165,27 @@ rollback([{ID, Level} | List]) -> rollback(List); rollback([]) -> ok. +%% Generate a function '$logger_header'/0 into the source code +trans([], LogHeader, ResAST) -> + lists:reverse([header_fun(LogHeader) | ResAST]); +trans([{eof, L} | AST], LogHeader, ResAST) -> + lists:reverse([{eof, L}, header_fun(LogHeader) | ResAST]) ++ AST; +trans([{attribute, _, module, _Mod} = M | AST], Header, ResAST) -> + trans(AST, Header, [export_header_fun(), M | ResAST]); +trans([{attribute, _, logger_header, Header} | AST], _, ResAST) -> + io_lib:printable_list(Header) orelse error({invalid_string, Header}), + trans(AST, Header, ResAST); +trans([F | AST], LogHeader, ResAST) -> + trans(AST, LogHeader, [F | ResAST]). + +export_header_fun() -> + {attribute,erl_anno:new(0),export,[{'$logger_header',0}]}. + +header_fun(LogHeader) -> + L = erl_anno:new(0), + {function,L,'$logger_header',0, + [{clause,L, + [], [], [{string,L,pad(LogHeader)}]}]}. + +pad("") -> ""; +pad(Str) -> Str ++ " ". diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index cf5939d38..fc04ab597 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -20,6 +20,8 @@ -include("types.hrl"). -include("emqx_mqtt.hrl"). +-logger_header("[Metrics]"). + -export([ start_link/0 , stop/0 ]). @@ -130,6 +132,10 @@ {counter, 'messages.forward'} % Messages forward ]). +-define(MQTT_METRICS, [ + {counter, 'auth.mqtt.anonymous'} +]). + -record(state, {next_idx = 1}). -record(metric, {name, type, idx}). @@ -353,17 +359,17 @@ init([]) -> Metric = #metric{name = Name, type = Type, idx = reserved_idx(Name)}, true = ets:insert(?TAB, Metric), ok = counters:put(CRef, Idx, 0) - end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS), + end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?MQTT_METRICS), {ok, #state{next_idx = ?RESERVED_IDX + 1}, hibernate}. handle_call({create, Type, Name}, _From, State = #state{next_idx = ?MAX_SIZE}) -> - ?LOG(error, "[Metrics] Failed to create ~s:~s for index exceeded.", [Type, Name]), + ?LOG(error, "Failed to create ~s:~s for index exceeded.", [Type, Name]), {reply, {error, metric_index_exceeded}, State}; handle_call({create, Type, Name}, _From, State = #state{next_idx = NextIdx}) -> case ets:lookup(?TAB, Name) of [#metric{idx = Idx}] -> - ?LOG(warning, "[Metrics] ~s already exists.", [Name]), + ?LOG(warning, "~s already exists.", [Name]), {reply, {ok, Idx}, State}; [] -> Metric = #metric{name = Name, type = Type, idx = NextIdx}, @@ -372,15 +378,15 @@ handle_call({create, Type, Name}, _From, State = #state{next_idx = NextIdx}) -> end; handle_call(Req, _From, State) -> - ?LOG(error, "[Metrics] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[Metrics] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "[Metrics] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> @@ -444,4 +450,5 @@ reserved_idx('messages.retained') -> 48; reserved_idx('messages.dropped') -> 49; reserved_idx('messages.expired') -> 50; reserved_idx('messages.forward') -> 51; +reserved_idx('auth.mqtt.anonymous') -> 52; reserved_idx(_) -> undefined. diff --git a/src/emqx_mod_acl_internal.erl b/src/emqx_mod_acl_internal.erl index 953666f31..9c88c9ea2 100644 --- a/src/emqx_mod_acl_internal.erl +++ b/src/emqx_mod_acl_internal.erl @@ -19,6 +19,8 @@ -include("emqx.hrl"). -include("logger.hrl"). +-logger_header("[ACL_INTERNAL]"). + %% APIs -export([ all_rules/0 , check_acl/5 @@ -99,7 +101,7 @@ rules_from_file(AclFile) -> #{publish => [Rule || Rule <- Rules, filter(publish, Rule)], subscribe => [Rule || Rule <- Rules, filter(subscribe, Rule)]}; {error, Reason} -> - ?LOG(alert, "[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]), + ?LOG(alert, "Failed to read ~s: ~p", [AclFile, Reason]), #{} end. diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index 9789474d7..ef164102d 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -19,6 +19,8 @@ -include("emqx.hrl"). -include("logger.hrl"). +-logger_header("[Presence]"). + %% APIs -export([ on_client_connected/4 , on_client_disconnected/3 @@ -49,23 +51,23 @@ on_client_connected(#{client_id := ClientId, username => Username, ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)), connack => ConnAck, - ts => os:system_time(second) + ts => erlang:system_time(millisecond) }) of {ok, Payload} -> emqx:publish(message(qos(Env), topic(connected, ClientId), Payload)); {error, Reason} -> - ?LOG(error, "[Presence] Encoding connected event error: ~p", [Reason]) + ?LOG(error, "Encoding connected event error: ~p", [Reason]) end. on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, Env) -> case emqx_json:safe_encode([{clientid, ClientId}, {username, Username}, {reason, reason(Reason)}, - {ts, os:system_time(second)}]) of + {ts, erlang:system_time(millisecond)}]) of {ok, Payload} -> emqx_broker:publish(message(qos(Env), topic(disconnected, ClientId), Payload)); {error, Reason} -> - ?LOG(error, "[Presence] Encoding disconnected event error: ~p", [Reason]) + ?LOG(error, "Encoding disconnected event error: ~p", [Reason]) end. unload(_Env) -> diff --git a/src/emqx_modules.erl b/src/emqx_modules.erl index f9d74dc81..79e2c02b4 100644 --- a/src/emqx_modules.erl +++ b/src/emqx_modules.erl @@ -16,6 +16,8 @@ -include("logger.hrl"). +-logger_header("[Modules]"). + -export([ load/0 , unload/0 ]). @@ -26,7 +28,7 @@ load() -> lists:foreach( fun({Mod, Env}) -> ok = Mod:load(Env), - ?LOG(info, "[Modules] Load ~s module successfully.", [Mod]) + ?LOG(info, "Load ~s module successfully.", [Mod]) end, emqx_config:get_env(modules, [])). -spec(unload() -> ok). diff --git a/src/emqx_mountpoint.erl b/src/emqx_mountpoint.erl index b98348395..66b8da057 100644 --- a/src/emqx_mountpoint.erl +++ b/src/emqx_mountpoint.erl @@ -17,6 +17,8 @@ -include("emqx.hrl"). -include("logger.hrl"). +-logger_header("[Mountpoint]"). + -export([ mount/2 , unmount/2 ]). @@ -46,7 +48,7 @@ unmount(MountPoint, Msg = #message{topic = Topic}) -> {MountPoint, Topic1} -> Msg#message{topic = Topic1} catch _Error:Reason -> - ?LOG(error, "[Mountpoint] Unmount error : ~p", [Reason]), + ?LOG(error, "Unmount error : ~p", [Reason]), Msg end. diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index ff38f687e..bde3e3eb5 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -24,6 +24,8 @@ , get_caps/2 ]). +-export([default_caps/0]). + -type(caps() :: #{max_packet_size => integer(), max_clientid_len => integer(), max_topic_alias => integer(), @@ -36,6 +38,7 @@ -export_type([caps/0]). -define(UNLIMITED, 0). + -define(DEFAULT_CAPS, [{max_packet_size, ?MAX_PACKET_SIZE}, {max_clientid_len, ?MAX_CLIENTID_LEN}, {max_topic_alias, ?UNLIMITED}, @@ -119,6 +122,9 @@ check_sub(Topic, Opts, [{max_topic_levels, Limit}|Caps]) -> _ -> check_sub(Topic, Opts, Caps) end. +default_caps() -> + ?DEFAULT_CAPS. + get_caps(Zone, publish) -> with_env(Zone, '$mqtt_pub_caps', fun() -> diff --git a/src/emqx_os_mon.erl b/src/emqx_os_mon.erl index 2a8eb3ca3..21624c079 100644 --- a/src/emqx_os_mon.erl +++ b/src/emqx_os_mon.erl @@ -18,6 +18,8 @@ -include("logger.hrl"). +-logger_header("[OS Monitor]"). + -export([start_link/1]). %% gen_server callbacks @@ -45,6 +47,11 @@ -define(OS_MON, ?MODULE). +-define(compat_windows(Expression), case os:type() of + {win32, nt} -> windows; + _Unix -> Expression + end). + %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ @@ -93,7 +100,7 @@ set_procmem_high_watermark(Float) -> %%------------------------------------------------------------------------------ init([Opts]) -> - _ = cpu_sup:util(), + _ = ?compat_windows(cpu_sup:util()), set_mem_check_interval(proplists:get_value(mem_check_interval, Opts, 60)), set_sysmem_high_watermark(proplists:get_value(sysmem_high_watermark, Opts, 0.70)), set_procmem_high_watermark(proplists:get_value(procmem_high_watermark, Opts, 0.05)), @@ -124,16 +131,18 @@ handle_call(_Request, _From, State) -> handle_cast(_Request, State) -> {noreply, State}. -handle_info({timeout, Timer, check}, State = #{timer := Timer, +handle_info({timeout, Timer, check}, State = #{timer := Timer, cpu_high_watermark := CPUHighWatermark, cpu_low_watermark := CPULowWatermark, is_cpu_alarm_set := IsCPUAlarmSet}) -> - case cpu_sup:util() of + case ?compat_windows(cpu_sup:util()) of 0 -> {noreply, State#{timer := undefined}}; {error, Reason} -> - ?LOG(error, "[OS Monitor] Failed to get cpu utilization: ~p", [Reason]), + ?LOG(error, "Failed to get cpu utilization: ~p", [Reason]), {noreply, ensure_check_timer(State)}; + windows -> + {noreply, State}; Busy when Busy / 100 >= CPUHighWatermark -> alarm_handler:set_alarm({cpu_high_watermark, Busy}), {noreply, ensure_check_timer(State#{is_cpu_alarm_set := true})}; @@ -161,4 +170,3 @@ call(Req) -> ensure_check_timer(State = #{cpu_check_interval := Interval}) -> State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}. - diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index 1c1185bf0..cedb5c5b0 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -17,6 +17,8 @@ -include("emqx.hrl"). -include("logger.hrl"). +-logger_header("[Plugins]"). + -export([init/0]). -export([ load/0 @@ -86,7 +88,7 @@ load_expand_plugin(PluginDir) -> end, Modules), case filelib:wildcard(Ebin ++ "/*.app") of [App|_] -> application:load(list_to_atom(filename:basename(App, ".app"))); - _ -> ?LOG(alert, "[Plugins] Plugin not found."), + _ -> ?LOG(alert, "Plugin not found."), {error, load_app_fail} end. @@ -112,7 +114,7 @@ with_loaded_file(File, SuccFun) -> Names = filter_plugins(Names0), SuccFun(Names); {error, Error} -> - ?LOG(alert, "[Plugins] Failed to read: ~p, error: ~p", [File, Error]), + ?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error]), {error, Error} end. @@ -126,7 +128,7 @@ load_plugins(Names, Persistent) -> Plugins = list(), NotFound = Names -- names(Plugins), case NotFound of [] -> ok; - NotFound -> ?LOG(alert, "[Plugins] Cannot find plugins: ~p", [NotFound]) + NotFound -> ?LOG(alert, "Cannot find plugins: ~p", [NotFound]) end, NeedToLoad = Names -- NotFound -- names(started_app), [load_plugin(find_plugin(Name, Plugins), Persistent) || Name <- NeedToLoad]. @@ -149,20 +151,20 @@ stop_plugins(Names) -> -spec(list() -> [emqx_types:plugin()]). list() -> StartedApps = names(started_app), - lists:map(fun({Name, _, _}) -> - Plugin = plugin(Name), + lists:map(fun({Name, _, [Type| _]}) -> + Plugin = plugin(Name, Type), case lists:member(Name, StartedApps) of true -> Plugin#plugin{active = true}; false -> Plugin end end, lists:sort(ekka_boot:all_module_attributes(emqx_plugin))). -plugin(AppName) -> +plugin(AppName, Type) -> case application:get_all_key(AppName) of {ok, Attrs} -> Ver = proplists:get_value(vsn, Attrs, "0"), Descr = proplists:get_value(description, Attrs, ""), - #plugin{name = AppName, version = Ver, descr = Descr}; + #plugin{name = AppName, version = Ver, descr = Descr, type = plugin_type(Type)}; undefined -> error({plugin_not_found, AppName}) end. @@ -171,12 +173,12 @@ plugin(AppName) -> load(PluginName) when is_atom(PluginName) -> case lists:member(PluginName, names(started_app)) of true -> - ?LOG(notice, "[Plugins] Plugin ~s is already started", [PluginName]), + ?LOG(notice, "Plugin ~s is already started", [PluginName]), {error, already_started}; false -> case find_plugin(PluginName) of false -> - ?LOG(alert, "[Plugins] Plugin ~s not found", [PluginName]), + ?LOG(alert, "Plugin ~s not found", [PluginName]), {error, not_found}; Plugin -> load_plugin(Plugin, true) @@ -204,12 +206,12 @@ load_app(App) -> start_app(App, SuccFun) -> case application:ensure_all_started(App) of {ok, Started} -> - ?LOG(info, "[Plugins] Started plugins: ~p", [Started]), - ?LOG(info, "[Plugins] Load plugin ~s successfully", [App]), + ?LOG(info, "Started plugins: ~p", [Started]), + ?LOG(info, "Load plugin ~s successfully", [App]), SuccFun(App), {ok, Started}; {error, {ErrApp, Reason}} -> - ?LOG(error, "[Plugins] Load plugin ~s failed, cannot start plugin ~s for ~p", [App, ErrApp, Reason]), + ?LOG(error, "Load plugin ~s failed, cannot start plugin ~s for ~p", [App, ErrApp, Reason]), {error, {ErrApp, Reason}} end. @@ -226,10 +228,10 @@ unload(PluginName) when is_atom(PluginName) -> {true, true} -> unload_plugin(PluginName, true); {false, _} -> - ?LOG(error, "[Plugins] Plugin ~s is not started", [PluginName]), + ?LOG(error, "Plugin ~s is not started", [PluginName]), {error, not_started}; {true, false} -> - ?LOG(error, "[Plugins] ~s is not a plugin, cannot unload it", [PluginName]), + ?LOG(error, "~s is not a plugin, cannot unload it", [PluginName]), {error, not_found} end. @@ -244,11 +246,11 @@ unload_plugin(App, Persistent) -> stop_app(App) -> case application:stop(App) of ok -> - ?LOG(info, "[Plugins] Stop plugin ~s successfully", [App]), ok; + ?LOG(info, "Stop plugin ~s successfully", [App]), ok; {error, {not_started, App}} -> - ?LOG(error, "[Plugins] Plugin ~s is not started", [App]), ok; + ?LOG(error, "Plugin ~s is not started", [App]), ok; {error, Reason} -> - ?LOG(error, "[Plugins] Stop plugin ~s error: ~p", [App]), {error, Reason} + ?LOG(error, "Stop plugin ~s error: ~p", [App]), {error, Reason} end. %%-------------------------------------------------------------------- @@ -276,7 +278,7 @@ plugin_loaded(Name, true) -> ignore end; {error, Error} -> - ?LOG(error, "[Plugins] Cannot read loaded plugins: ~p", [Error]) + ?LOG(error, "Cannot read loaded plugins: ~p", [Error]) end. plugin_unloaded(_Name, false) -> @@ -289,10 +291,10 @@ plugin_unloaded(Name, true) -> true -> write_loaded(lists:delete(Name, Names)); false -> - ?LOG(error, "[Plugins] Cannot find ~s in loaded_file", [Name]) + ?LOG(error, "Cannot find ~s in loaded_file", [Name]) end; {error, Error} -> - ?LOG(error, "[Plugins] Cannot read loaded_plugins: ~p", [Error]) + ?LOG(error, "Cannot read loaded_plugins: ~p", [Error]) end. read_loaded() -> @@ -311,6 +313,13 @@ write_loaded(AppNames) -> file:write(Fd, iolist_to_binary(io_lib:format("~p.~n", [Name]))) end, AppNames); {error, Error} -> - ?LOG(error, "[Plugins] Open File ~p Error: ~p", [File, Error]), + ?LOG(error, "Open File ~p Error: ~p", [File, Error]), {error, Error} end. + +plugin_type(auth) -> auth; +plugin_type(protocol) -> protocol; +plugin_type(backend) -> backend; +plugin_type(bridge) -> bridge; +plugin_type(_) -> feature. + diff --git a/src/emqx_pool.erl b/src/emqx_pool.erl index b78479f5c..5a6231ee0 100644 --- a/src/emqx_pool.erl +++ b/src/emqx_pool.erl @@ -19,6 +19,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Pool]"). + %% APIs -export([start_link/2]). @@ -97,22 +99,22 @@ handle_call({submit, Task}, _From, State) -> {reply, catch run(Task), State}; handle_call(Req, _From, State) -> - ?LOG(error, "[Pool] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({async_submit, Task}, State) -> try run(Task) catch _:Error:Stacktrace -> - ?LOG(error, "[Pool] Error: ~p, ~p", [Error, Stacktrace]) + ?LOG(error, "Error: ~p, ~p", [Error, Stacktrace]) end, {noreply, State}; handle_cast(Msg, State) -> - ?LOG(error, "[Pool] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "[Pool] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #{pool := Pool, id := Id}) -> diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index b6ca03d7f..7d1ecc04e 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -20,6 +20,8 @@ -include("emqx_mqtt.hrl"). -include("logger.hrl"). +-logger_header("[Protocol]"). + -export([ info/1 , attrs/1 , attr/2 @@ -413,11 +415,11 @@ process(?CONNECT_PACKET( %% Success {?RC_SUCCESS, SP, PState4}; {error, Error} -> - ?LOG(error, "[Protocol] Failed to open session: ~p", [Error]), + ?LOG(error, "Failed to open session: ~p", [Error]), {?RC_UNSPECIFIED_ERROR, PState1#pstate{credentials = Credentials0}} end; {error, Reason} -> - ?LOG(warning, "[Protocol] Client ~s (Username: '~s') login failed for ~p", [NewClientId, Username, Reason]), + ?LOG(warning, "Client ~s (Username: '~s') login failed for ~p", [NewClientId, Username, Reason]), {emqx_reason_codes:connack_error(Reason), PState1#pstate{credentials = Credentials}} end; {error, ReasonCode} -> @@ -429,7 +431,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState = # ok -> do_publish(Packet, PState); {error, ReasonCode} -> - ?LOG(warning, "[Protocol] Cannot publish qos0 message to ~s for ~s", + ?LOG(warning, "Cannot publish qos0 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore), do_acl_deny_action(AclDenyAction, Packet, ReasonCode, PState) @@ -440,7 +442,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState = #p ok -> do_publish(Packet, PState); {error, ReasonCode} -> - ?LOG(warning, "[Protocol] Cannot publish qos1 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), + ?LOG(warning, "Cannot publish qos1 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), case deliver({puback, PacketId, ReasonCode}, PState) of {ok, PState1} -> AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore), @@ -454,7 +456,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState = #p ok -> do_publish(Packet, PState); {error, ReasonCode} -> - ?LOG(warning, "[Protocol] Cannot publish qos2 message to ~s for ~s", + ?LOG(warning, "Cannot publish qos2 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), case deliver({pubrec, PacketId, ReasonCode}, PState) of {ok, PState1} -> @@ -501,7 +503,7 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), ({Topic, #{rc := Code}}, {Topics, Codes}) -> {[Topic|Topics], [Code|Codes]} end, {[], []}, TopicFilters), - ?LOG(warning, "[Protocol] Cannot subscribe ~p for ~p", + ?LOG(warning, "Cannot subscribe ~p for ~p", [SubTopics, [emqx_reason_codes:text(R) || R <- ReasonCodes]]), case deliver({suback, PacketId, ReasonCodes}, PState) of {ok, PState1} -> @@ -844,7 +846,7 @@ check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, case do_acl_check(EnableAcl, publish, Credentials, WillTopic) of ok -> ok; Other -> - ?LOG(warning, "[Protocol] Cannot publish will message to ~p for acl denied", [WillTopic]), + ?LOG(warning, "Cannot publish will message to ~p for acl denied", [WillTopic]), Other end. @@ -898,9 +900,9 @@ check_sub_acl(TopicFilters, #pstate{zone = Zone, credentials = Credentials}) -> end, {ok, []}, TopicFilters). trace(recv, Packet) -> - ?LOG(debug, "[Protocol] RECV ~s", [emqx_packet:format(Packet)]); + ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]); trace(send, Packet) -> - ?LOG(debug, "[Protocol] SEND ~s", [emqx_packet:format(Packet)]). + ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]). inc_stats(recv, Type, PState = #pstate{recv_stats = Stats}) -> PState#pstate{recv_stats = inc_stats(Type, Stats)}; @@ -926,7 +928,7 @@ terminate(Reason, PState) when Reason =:= conflict; terminate(Reason, PState = #pstate{credentials = Credentials}) -> do_flapping_detect(disconnect, PState), - ?LOG(info, "[Protocol] Shutdown for ~p", [Reason]), + ?LOG(info, "Shutdown for ~p", [Reason]), ok = emqx_hooks:run('client.disconnected', [Credentials, Reason]). start_keepalive(0, _PState) -> @@ -961,7 +963,7 @@ do_flapping_detect(Action, #pstate{zone = Zone, Threshold = emqx_zone:get_env(Zone, flapping_threshold, {10, 60}), case emqx_flapping:check(Action, ClientId, Threshold) of flapping -> - BanExpiryInterval = emqx_zone:get_env(Zone, flapping_ban_expiry_interval, 3600000), + BanExpiryInterval = emqx_zone:get_env(Zone, flapping_banned_expiry_interval, 3600000), Until = erlang:system_time(second) + BanExpiryInterval, emqx_banned:add(#banned{who = {client_id, ClientId}, reason = <<"flapping">>, diff --git a/src/emqx_psk.erl b/src/emqx_psk.erl index ae2f8935b..cb8835e78 100644 --- a/src/emqx_psk.erl +++ b/src/emqx_psk.erl @@ -16,6 +16,8 @@ -include("logger.hrl"). +-logger_header("[PSK]"). + %% SSL PSK Callbacks -export([lookup/3]). @@ -27,10 +29,10 @@ lookup(psk, ClientPSKID, _UserState) -> try emqx_hooks:run_fold('tls_handshake.psk_lookup', [ClientPSKID], not_found) of SharedSecret when is_binary(SharedSecret) -> {ok, SharedSecret}; Error -> - ?LOG(error, "[PSK] Look PSK for PSKID ~p error: ~p", [ClientPSKID, Error]), + ?LOG(error, "Look PSK for PSKID ~p error: ~p", [ClientPSKID, Error]), error catch Except:Error:Stacktrace -> - ?LOG(error, "[PSK] Lookup PSK failed, ~p: ~p", [{Except,Error}, Stacktrace]), + ?LOG(error, "Lookup PSK failed, ~p: ~p", [{Except,Error}, Stacktrace]), error end. \ No newline at end of file diff --git a/src/emqx_router.erl b/src/emqx_router.erl index 8eb65169c..aefe61667 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -21,6 +21,8 @@ -include("types.hrl"). -include_lib("ekka/include/ekka.hrl"). +-logger_header("[Router]"). + %% Mnesia bootstrap -export([mnesia/1]). @@ -198,15 +200,15 @@ handle_call({delete_route, Topic, Dest}, _From, State) -> {reply, Ok, State}; handle_call(Req, _From, State) -> - ?LOG(error, "[Router] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[Router] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "[Router] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #{pool := Pool, id := Id}) -> diff --git a/src/emqx_router_helper.erl b/src/emqx_router_helper.erl index f272b1236..aac228f66 100644 --- a/src/emqx_router_helper.erl +++ b/src/emqx_router_helper.erl @@ -20,6 +20,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Router Helper]"). + %% Mnesia bootstrap -export([mnesia/1]). @@ -103,11 +105,11 @@ init([]) -> {ok, #{nodes => Nodes}, hibernate}. handle_call(Req, _From, State) -> - ?LOG(error, "[Router Helper] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[Router Helper] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}}, State = #{nodes := Nodes}) -> @@ -123,7 +125,7 @@ handle_info({mnesia_table_event, {delete, {?ROUTING_NODE, _Node}, _}}, State) -> {noreply, State}; handle_info({mnesia_table_event, Event}, State) -> - ?LOG(error, "[Router Helper] Unexpected mnesia_table_event: ~p", [Event]), + ?LOG(error, "Unexpected mnesia_table_event: ~p", [Event]), {noreply, State}; handle_info({nodedown, Node}, State = #{nodes := Nodes}) -> @@ -141,7 +143,7 @@ handle_info({membership, _Event}, State) -> {noreply, State}; handle_info(Info, State) -> - ?LOG(error, "[Route Helper] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 85d2c1947..49454d8ac 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -47,6 +47,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Session]"). + -export([start_link/1]). -export([ info/1 @@ -399,11 +401,11 @@ handle_call(stats, _From, State) -> reply(stats(State), State); handle_call({discard, ByPid}, _From, State = #state{conn_pid = undefined}) -> - ?LOG(warning, "[Session] Discarded by ~p", [ByPid]), + ?LOG(warning, "Discarded by ~p", [ByPid]), {stop, {shutdown, discarded}, ok, State}; handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_pid = ConnPid}) -> - ?LOG(warning, "[Session] Conn ~p is discarded by ~p", [ConnPid, ByPid]), + ?LOG(warning, "Conn ~p is discarded by ~p", [ConnPid, ByPid]), ConnPid ! {shutdown, discard, {ClientId, ByPid}}, {stop, {shutdown, discarded}, ok, State}; @@ -423,7 +425,7 @@ handle_call({register_publish_packet_id, PacketId, Ts}, _From, {ok, ensure_stats_timer(ensure_await_rel_timer(State1))} end; true -> - ?LOG(warning, "[Session] Dropped qos2 packet ~w for too many awaiting_rel", [PacketId]), + ?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId]), ok = emqx_metrics:inc('messages.qos2.dropped'), {{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State} end); @@ -435,7 +437,7 @@ handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = In true -> {ok, ensure_stats_timer(acked(pubrec, PacketId, State))}; false -> - ?LOG(warning, "[Session] The PUBREC PacketId ~w is not found.", [PacketId]), + ?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId]), ok = emqx_metrics:inc('packets.pubrec.missed'), {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} end); @@ -447,7 +449,7 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel {_Ts, AwaitingRel1} -> {ok, ensure_stats_timer(State#state{awaiting_rel = AwaitingRel1})}; error -> - ?LOG(warning, "[Session] The PUBREL PacketId ~w is not found", [PacketId]), + ?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]), ok = emqx_metrics:inc('packets.pubrel.missed'), {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} end); @@ -456,7 +458,7 @@ handle_call(close, _From, State) -> {stop, normal, ok, State}; handle_call(Req, _From, State) -> - ?LOG(error, "[Session] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. %% SUBSCRIBE: @@ -497,7 +499,7 @@ handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight} true -> ensure_stats_timer(dequeue(acked(puback, PacketId, State))); false -> - ?LOG(warning, "[Session] The PUBACK PacketId ~w is not found", [PacketId]), + ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]), ok = emqx_metrics:inc('packets.puback.missed'), State end); @@ -509,7 +511,7 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight true -> ensure_stats_timer(dequeue(acked(pubcomp, PacketId, State))); false -> - ?LOG(warning, "[Session] The PUBCOMP PacketId ~w is not found", [PacketId]), + ?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]), ok = emqx_metrics:inc('packets.pubcomp.missed'), State end); @@ -527,14 +529,14 @@ handle_cast({resume, #{conn_pid := ConnPid, expiry_timer = ExpireTimer, will_delay_timer = WillDelayTimer}) -> - ?LOG(info, "[Session] Resumed by connection ~p ", [ConnPid]), + ?LOG(info, "Resumed by connection ~p ", [ConnPid]), %% Cancel Timers lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer, WillDelayTimer]), case kick(ClientId, OldConnPid, ConnPid) of - ok -> ?LOG(warning, "[Session] Connection ~p kickout ~p", [ConnPid, OldConnPid]); + ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid]); ignore -> ok end, @@ -565,7 +567,7 @@ handle_cast({update_expiry_interval, Interval}, State) -> {noreply, State#state{expiry_interval = Interval}}; handle_cast(Msg, State) -> - ?LOG(error, "[Session] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, message) -> @@ -600,12 +602,12 @@ handle_info({timeout, Timer, emit_stats}, GcState1 = emqx_gc:reset(GcState), {noreply, NewState#state{gc_state = GcState1}, hibernate}; {shutdown, Reason} -> - ?LOG(warning, "[Session] Shutdown exceptionally due to ~p", [Reason]), + ?LOG(warning, "Shutdown exceptionally due to ~p", [Reason]), shutdown(Reason, NewState) end; handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) -> - ?LOG(info, "[Session] Expired, shutdown now.", []), + ?LOG(info, "Expired, shutdown now.", []), shutdown(expired, State); handle_info({timeout, Timer, will_delay}, State = #state{will_msg = WillMsg, will_delay_timer = Timer}) -> @@ -640,12 +642,12 @@ handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) -> {noreply, State#state{old_conn_pid = undefined}}; handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) -> - ?LOG(error, "[Session] Unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p", + ?LOG(error, "Unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p", [ConnPid, Pid, Reason]), {noreply, State}; handle_info(Info, State) -> - ?LOG(error, "[Session] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(Reason, #state{will_msg = WillMsg, @@ -771,7 +773,7 @@ expire_awaiting_rel([{PacketId, Ts} | More], Now, case (timer:now_diff(Now, Ts) div 1000) of Age when Age >= Timeout -> ok = emqx_metrics:inc('messages.qos2.expired'), - ?LOG(warning, "[Session] Dropped qos2 packet ~s for await_rel_timeout", [PacketId]), + ?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId]), expire_awaiting_rel(More, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)}); Age -> ensure_await_rel_timer(Timeout - max(0, Age), State) @@ -981,7 +983,7 @@ acked(puback, PacketId, State = #state{client_id = ClientId, username = Username ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]), State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}; none -> - ?LOG(warning, "[Session] Duplicated PUBACK PacketId ~w", [PacketId]), + ?LOG(warning, "Duplicated PUBACK PacketId ~w", [PacketId]), State end; @@ -991,10 +993,10 @@ acked(pubrec, PacketId, State = #state{client_id = ClientId, username = Username ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]), State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)}; {value, {pubrel, PacketId, _Ts}} -> - ?LOG(warning, "[Session] Duplicated PUBREC PacketId ~w", [PacketId]), + ?LOG(warning, "Duplicated PUBREC PacketId ~w", [PacketId]), State; none -> - ?LOG(warning, "[Session] Unexpected PUBREC PacketId ~w", [PacketId]), + ?LOG(warning, "Unexpected PUBREC PacketId ~w", [PacketId]), State end; diff --git a/src/emqx_session_sup.erl b/src/emqx_session_sup.erl index ad721d5fd..b271bd85d 100644 --- a/src/emqx_session_sup.erl +++ b/src/emqx_session_sup.erl @@ -19,6 +19,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Session Supervisor]"). + -export([start_link/1]). -export([ start_session/1 @@ -92,7 +94,7 @@ handle_call({start_session, SessAttrs = #{client_id := ClientId}}, _From, reply({error, Reason}, State) catch _:Error:Stk -> - ?LOG(error, "[Session Supervisor] Failed to start session ~p: ~p, stacktrace:~n~p", + ?LOG(error, "Failed to start session ~p: ~p, stacktrace:~n~p", [ClientId, Error, Stk]), reply({error, Error}, State) end; @@ -101,11 +103,11 @@ handle_call(count_sessions, _From, State = #state{sessions = SessMap}) -> {reply, maps:size(SessMap), State}; handle_call(Req, _From, State) -> - ?LOG(error, "[Session Supervisor] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[Session Supervisor] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({'EXIT', Pid, _Reason}, State = #state{sessions = SessMap, clean_down = CleanDown}) -> @@ -117,7 +119,7 @@ handle_info({'EXIT', Pid, _Reason}, State = #state{sessions = SessMap, clean_dow {noreply, State#state{sessions = SessMap1}}; handle_info(Info, State) -> - ?LOG(notice, "[Session Supervisor] Unexpected info: ~p", [Info]), + ?LOG(notice, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, State) -> diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 48eba293a..3b0d48189 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -21,6 +21,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Shared Sub]"). + %% Mnesia bootstrap -export([mnesia/1]). @@ -310,11 +312,11 @@ handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) -> {reply, ok, State}; handle_call(Req, _From, State) -> - ?LOG(error, "[Shared Sub] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[Shared Sub] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) -> @@ -329,12 +331,12 @@ handle_info({mnesia_table_event, _Event}, State) -> {noreply, State}; handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) -> - ?LOG(info, "[Shared Sub] Shared subscriber down: ~p", [SubPid]), + ?LOG(info, "Shared subscriber down: ~p", [SubPid]), cleanup_down(SubPid), {noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})}; handle_info(Info, State) -> - ?LOG(error, "[Shared Sub] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index a122a05fe..256425f5a 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -20,6 +20,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[SM]"). + %% APIs -export([start_link/0]). @@ -114,7 +116,7 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) -> try emqx_session:discard(SessPid, ConnPid) catch _:Error:_Stk -> - ?LOG(warning, "[SM] Failed to discard ~p: ~p", [SessPid, Error]) + ?LOG(warning, "Failed to discard ~p: ~p", [SessPid, Error]) end end, lookup_session_pids(ClientId)). @@ -128,7 +130,7 @@ resume_session(ClientId, SessAttrs = #{conn_pid := ConnPid}) -> {ok, SessPid}; SessPids -> [SessPid|StalePids] = lists:reverse(SessPids), - ?LOG(error, "[SM] More than one session found: ~p", [SessPids]), + ?LOG(error, "More than one session found: ~p", [SessPids]), lists:foreach(fun(StalePid) -> catch emqx_session:discard(StalePid, ConnPid) end, StalePids), @@ -254,15 +256,15 @@ init([]) -> {ok, #{}}. handle_call(Req, _From, State) -> - ?LOG(error, "[SM] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[SM] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "[SM] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_sm_registry.erl b/src/emqx_sm_registry.erl index ccacc9907..5e8d4663d 100644 --- a/src/emqx_sm_registry.erl +++ b/src/emqx_sm_registry.erl @@ -20,6 +20,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Registry]"). + -export([start_link/0]). -export([ is_enabled/0 @@ -96,11 +98,11 @@ init([]) -> {ok, #{}}. handle_call(Req, _From, State) -> - ?LOG(error, "[Registry] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[Registry] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({membership, {mnesia, down, Node}}, State) -> @@ -114,7 +116,7 @@ handle_info({membership, _Event}, State) -> {noreply, State}; handle_info(Info, State) -> - ?LOG(error, "[Registry] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index c75bd742d..a095df8d8 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -20,6 +20,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Stats]"). + %% APIs -export([ start_link/0 , start_link/1 @@ -184,7 +186,7 @@ handle_call(stop, _From, State) -> {stop, normal, ok, State}; handle_call(Req, _From, State) -> - ?LOG(error, "[Stats] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({setstat, Stat, MaxStat, Val}, State) -> @@ -202,7 +204,7 @@ handle_cast({setstat, Stat, MaxStat, Val}, State) -> handle_cast({update_interval, Update = #update{name = Name}}, State = #state{updates = Updates}) -> case lists:keyfind(Name, #update.name, Updates) of #update{} -> - ?LOG(warning, "[Stats] Duplicated update: ~s", [Name]), + ?LOG(warning, "Duplicated update: ~s", [Name]), {noreply, State}; false -> {noreply, State#state{updates = [Update | Updates]}} @@ -212,7 +214,7 @@ handle_cast({cancel_update, Name}, State = #state{updates = Updates}) -> {noreply, State#state{updates = lists:keydelete(Name, #update.name, Updates)}}; handle_cast(Msg, State) -> - ?LOG(error, "[Stats] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Updates}) -> @@ -222,7 +224,7 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update try UpFun() catch _:Error -> - ?LOG(error, "[Stats] update ~s failed: ~p", [Name, Error]) + ?LOG(error, "update ~s failed: ~p", [Name, Error]) end, [Update#update{countdown = I} | Acc]; (Update = #update{countdown = C}, Acc) -> @@ -231,7 +233,7 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update {noreply, start_timer(State#state{updates = Updates1}), hibernate}; handle_info(Info, State) -> - ?LOG(error, "[Stats] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{timer = TRef}) -> @@ -252,5 +254,5 @@ safe_update_element(Key, Val) -> true catch error:badarg -> - ?LOG(warning, "[Stats] Update ~p to ~p failed", [Key, Val]) + ?LOG(warning, "Update ~p to ~p failed", [Key, Val]) end. diff --git a/src/emqx_sys.erl b/src/emqx_sys.erl index 90b24e122..37e22856f 100644 --- a/src/emqx_sys.erl +++ b/src/emqx_sys.erl @@ -19,6 +19,8 @@ -include("emqx.hrl"). -include("logger.hrl"). +-logger_header("[SYS]"). + -export([start_link/0]). -export([ version/0 @@ -117,11 +119,11 @@ handle_call(uptime, _From, State) -> {reply, uptime(State), State}; handle_call(Req, _From, State) -> - ?LOG(error, "[SYS] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[SYS] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) -> @@ -138,7 +140,7 @@ handle_info({timeout, TRef, tick}, State = #state{ticker = TRef, version = Versi {noreply, tick(State), hibernate}; handle_info(Info, State) -> - ?LOG(error, "[SYS] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) -> diff --git a/src/emqx_sys_mon.erl b/src/emqx_sys_mon.erl index b75503b53..f66c2a0c5 100644 --- a/src/emqx_sys_mon.erl +++ b/src/emqx_sys_mon.erl @@ -19,6 +19,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[SYSMON]"). + -export([start_link/1]). %% compress unused warning @@ -88,18 +90,18 @@ parse_opt([_Opt|Opts], Acc) -> parse_opt(Opts, Acc). handle_call(Req, _From, State) -> - ?LOG(error, "[SYSMON] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[SYSMON] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({monitor, Pid, long_gc, Info}, State) -> suppress({long_gc, Pid}, fun() -> WarnMsg = io_lib:format("long_gc warning: pid = ~p, info: ~p", [Pid, Info]), - ?LOG(warning, "[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]), + ?LOG(warning, "~s~n~p", [WarnMsg, procinfo(Pid)]), safe_publish(long_gc, WarnMsg) end, State); @@ -107,7 +109,7 @@ handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) -> suppress({long_schedule, Pid}, fun() -> WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]), - ?LOG(warning, "[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]), + ?LOG(warning, "~s~n~p", [WarnMsg, procinfo(Pid)]), safe_publish(long_schedule, WarnMsg) end, State); @@ -115,7 +117,7 @@ handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) -> suppress({long_schedule, Port}, fun() -> WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]), - ?LOG(warning, "[SYSMON] ~s~n~p", [WarnMsg, erlang:port_info(Port)]), + ?LOG(warning, "~s~n~p", [WarnMsg, erlang:port_info(Port)]), safe_publish(long_schedule, WarnMsg) end, State); @@ -123,7 +125,7 @@ handle_info({monitor, Pid, large_heap, Info}, State) -> suppress({large_heap, Pid}, fun() -> WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]), - ?LOG(warning, "[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]), + ?LOG(warning, "~s~n~p", [WarnMsg, procinfo(Pid)]), safe_publish(large_heap, WarnMsg) end, State); @@ -131,7 +133,7 @@ handle_info({monitor, SusPid, busy_port, Port}, State) -> suppress({busy_port, Port}, fun() -> WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]), - ?LOG(warning, "[SYSMON] ~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), + ?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), safe_publish(busy_port, WarnMsg) end, State); @@ -139,7 +141,7 @@ handle_info({monitor, SusPid, busy_dist_port, Port}, State) -> suppress({busy_dist_port, Port}, fun() -> WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]), - ?LOG(warning, "[SYSMON] ~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), + ?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), safe_publish(busy_dist_port, WarnMsg) end, State); @@ -147,7 +149,7 @@ handle_info({timeout, _Ref, reset}, State) -> {noreply, State#{events := []}, hibernate}; handle_info(Info, State) -> - ?LOG(error, "[SYSMON] Unexpected Info: ~p", [Info]), + ?LOG(error, "Unexpected Info: ~p", [Info]), {noreply, State}. terminate(_Reason, #{timer := TRef}) -> diff --git a/src/emqx_tracer.erl b/src/emqx_tracer.erl index c85aa5007..21562930b 100644 --- a/src/emqx_tracer.erl +++ b/src/emqx_tracer.erl @@ -19,6 +19,8 @@ -include("emqx.hrl"). -include("logger.hrl"). +-logger_header("[Tracer]"). + %% APIs -export([start_link/0]). @@ -121,10 +123,10 @@ handle_call({start_trace, Who, Level, LogFile}, _From, State = #state{traces = T filters => [{meta_key_filter, {fun filter_by_meta_key/2, Who} }]}) of ok -> - ?LOG(info, "[Tracer] Start trace for ~p", [Who]), + ?LOG(info, "Start trace for ~p", [Who]), {reply, ok, State#state{traces = maps:put(Who, {Level, LogFile}, Traces)}}; {error, Reason} -> - ?LOG(error, "[Tracer] Start trace for ~p failed, error: ~p", [Who, Reason]), + ?LOG(error, "Start trace for ~p failed, error: ~p", [Who, Reason]), {reply, {error, Reason}, State} end; @@ -133,9 +135,9 @@ handle_call({stop_trace, Who}, _From, State = #state{traces = Traces}) -> {ok, _LogFile} -> case logger:remove_handler(handler_id(Who)) of ok -> - ?LOG(info, "[Tracer] Stop trace for ~p", [Who]); + ?LOG(info, "Stop trace for ~p", [Who]); {error, Reason} -> - ?LOG(error, "[Tracer] Stop trace for ~p failed, error: ~p", [Who, Reason]) + ?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason]) end, {reply, ok, State#state{traces = maps:remove(Who, Traces)}}; error -> @@ -146,15 +148,15 @@ handle_call(lookup_traces, _From, State = #state{traces = Traces}) -> {reply, [{Who, LogFile} || {Who, LogFile} <- maps:to_list(Traces)], State}; handle_call(Req, _From, State) -> - ?LOG(error, "[Tracer] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[Tracer] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "[Tracer] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_ws_channel.erl b/src/emqx_ws_channel.erl index b20ad3478..771883be2 100644 --- a/src/emqx_ws_channel.erl +++ b/src/emqx_ws_channel.erl @@ -20,6 +20,8 @@ -include("emqx_mqtt.hrl"). -include("logger.hrl"). +-logger_header("[WS Channel]"). + -export([ info/1 , attrs/1 , stats/1 @@ -143,11 +145,11 @@ websocket_init(#state{request = Req, options = Options}) -> WsCookie = try cowboy_req:parse_cookies(Req) catch error:badarg -> - ?LOG(error, "[WS Channel] Illegal cookie"), + ?LOG(error, "Illegal cookie"), undefined; Error:Reason -> ?LOG(error, - "[WS Channel] Cookie is parsed failed, Error: ~p, Reason ~p", + "Cookie is parsed failed, Error: ~p, Reason ~p", [Error, Reason]), undefined end, @@ -189,7 +191,7 @@ websocket_handle({binary, <<>>}, State) -> websocket_handle({binary, [<<>>]}, State) -> {ok, ensure_stats_timer(State)}; websocket_handle({binary, Data}, State = #state{parse_state = ParseState}) -> - ?LOG(debug, "[WS Channel] RECV ~p", [Data]), + ?LOG(debug, "RECV ~p", [Data]), BinSize = iolist_size(Data), emqx_pd:update_counter(recv_oct, BinSize), ok = emqx_metrics:inc('bytes.received', BinSize), @@ -204,11 +206,11 @@ websocket_handle({binary, Data}, State = #state{parse_state = ParseState}) -> end, State#state{parse_state = NParseState}); {error, Reason} -> - ?LOG(error, "[WS Channel] Frame error: ~p", [Reason]), + ?LOG(error, "Frame error: ~p", [Reason]), shutdown(Reason, State) catch error:Reason:Stk -> - ?LOG(error, "[WS Channel] Parse failed for ~p~n\ + ?LOG(error, "Parse failed for ~p~n\ Stacktrace:~p~nFrame data: ~p", [Reason, Stk, Data]), shutdown(parse_error, State) end; @@ -219,7 +221,11 @@ websocket_handle(Frame, State) {ok, ensure_stats_timer(State)}; websocket_handle({FrameType, _}, State) when FrameType =:= ping; FrameType =:= pong -> - {ok, ensure_stats_timer(State)}. + {ok, ensure_stats_timer(State)}; +%% According to mqtt spec[https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901285] +websocket_handle({_OtherFrameType, _}, State) -> + ?LOG(error, "Frame error: Other type of data frame"), + shutdown(other_frame_type, State). websocket_info({call, From, info}, State) -> gen_server:reply(From, info(State)), @@ -255,12 +261,12 @@ websocket_info({timeout, Timer, emit_stats}, {ok, State#state{stats_timer = undefined}, hibernate}; websocket_info({keepalive, start, Interval}, State) -> - ?LOG(debug, "[WS Channel] Keepalive at the interval of ~p", [Interval]), + ?LOG(debug, "Keepalive at the interval of ~p", [Interval]), case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of {ok, KeepAlive} -> {ok, State#state{keepalive = KeepAlive}}; {error, Error} -> - ?LOG(warning, "[WS Channel] Keepalive error: ~p", [Error]), + ?LOG(warning, "Keepalive error: ~p", [Error]), shutdown(Error, State) end; @@ -269,19 +275,19 @@ websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> {ok, KeepAlive1} -> {ok, State#state{keepalive = KeepAlive1}}; {error, timeout} -> - ?LOG(debug, "[WS Channel] Keepalive Timeout!"), + ?LOG(debug, "Keepalive Timeout!"), shutdown(keepalive_timeout, State); {error, Error} -> - ?LOG(error, "[WS Channel] Keepalive error: ~p", [Error]), + ?LOG(error, "Keepalive error: ~p", [Error]), shutdown(keepalive_error, State) end; websocket_info({shutdown, discard, {ClientId, ByPid}}, State) -> - ?LOG(warning, "[WS Channel] Discarded by ~s:~p", [ClientId, ByPid]), + ?LOG(warning, "Discarded by ~s:~p", [ClientId, ByPid]), shutdown(discard, State); websocket_info({shutdown, conflict, {ClientId, NewPid}}, State) -> - ?LOG(warning, "[WS Channel] Clientid '~s' conflict with ~p", [ClientId, NewPid]), + ?LOG(warning, "Clientid '~s' conflict with ~p", [ClientId, NewPid]), shutdown(conflict, State); websocket_info({binary, Data}, State) -> @@ -294,13 +300,13 @@ websocket_info({stop, Reason}, State) -> {stop, State#state{shutdown = Reason}}; websocket_info(Info, State) -> - ?LOG(error, "[WS Channel] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {ok, State}. terminate(SockError, _Req, #state{keepalive = Keepalive, proto_state = ProtoState, shutdown = Shutdown}) -> - ?LOG(debug, "[WS Channel] Terminated for ~p, sockerror: ~p", + ?LOG(debug, "Terminated for ~p, sockerror: ~p", [Shutdown, SockError]), emqx_keepalive:cancel(Keepalive), case {ProtoState, Shutdown} of @@ -320,7 +326,7 @@ handle_incoming(Packet, SuccFun, State = #state{proto_state = ProtoState}) -> {ok, NProtoState} -> SuccFun(State#state{proto_state = NProtoState}); {error, Reason} -> - ?LOG(error, "[WS Channel] Protocol error: ~p", [Reason]), + ?LOG(error, "Protocol error: ~p", [Reason]), shutdown(Reason, State); {error, Reason, NProtoState} -> shutdown(Reason, State#state{proto_state = NProtoState}); diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 65deeea24..d654487c5 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -20,6 +20,8 @@ -include("logger.hrl"). -include("types.hrl"). +-logger_header("[Zone]"). + %% APIs -export([start_link/0]). @@ -96,7 +98,7 @@ handle_call(force_reload, _From, State) -> {reply, ok, State}; handle_call(Req, _From, State) -> - ?LOG(error, "[Zone] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({set_env, Zone, Key, Val}, State) -> @@ -104,11 +106,11 @@ handle_cast({set_env, Zone, Key, Val}, State) -> {noreply, State}; handle_cast(Msg, State) -> - ?LOG(error, "[Zone] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "[Zone] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index d57094925..9be887ce3 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -36,8 +36,6 @@ all() -> groups() -> [{access_control, [sequence], [reload_acl, - register_mod, - unregister_mod, check_acl_1, check_acl_2]}, {access_control_cache_mode, [], @@ -98,58 +96,26 @@ write_config(Filename, Terms) -> end_per_group(_Group, Config) -> Config. -init_per_testcase(_TestCase, Config) -> - ?AC:start_link(), - Config. -end_per_testcase(_TestCase, _Config) -> - ok. - -per_testcase_config(acl_cache_full, Config) -> - Config; -per_testcase_config(_TestCase, Config) -> - Config. - %%-------------------------------------------------------------------- %% emqx_access_control %%-------------------------------------------------------------------- reload_acl(_) -> - [ok] = ?AC:reload_acl(). - -register_mod(_) -> - ok = ?AC:register_mod(acl, emqx_acl_test_mod, []), - {emqx_acl_test_mod, _, 0} = hd(?AC:lookup_mods(acl)), - ok = ?AC:register_mod(auth, emqx_auth_anonymous_test_mod,[]), - ok = ?AC:register_mod(auth, emqx_auth_dashboard, [], 99), - [{emqx_auth_dashboard, _, 99}, - {emqx_auth_anonymous_test_mod, _, 0}] = ?AC:lookup_mods(auth). - -unregister_mod(_) -> - ok = ?AC:register_mod(acl, emqx_acl_test_mod, []), - {emqx_acl_test_mod, _, 0} = hd(?AC:lookup_mods(acl)), - ok = ?AC:unregister_mod(acl, emqx_acl_test_mod), - timer:sleep(5), - {emqx_acl_internal, _, 0}= hd(?AC:lookup_mods(acl)), - ok = ?AC:register_mod(auth, emqx_auth_anonymous_test_mod,[]), - [{emqx_auth_anonymous_test_mod, _, 0}] = ?AC:lookup_mods(auth), - - ok = ?AC:unregister_mod(auth, emqx_auth_anonymous_test_mod), - timer:sleep(5), - [] = ?AC:lookup_mods(auth). + ok = ?AC:reload_acl(). check_acl_1(_) -> - SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>}, + SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external}, allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), - deny = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1/x/y">>), + deny = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1/x/y">>), allow = ?AC:check_acl(SelfUser, publish, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>). check_acl_2(_) -> - SelfUser = #{client_id => <<"client2">>, username => <<"xyz">>}, + SelfUser = #{client_id => <<"client2">>, username => <<"xyz">>, zone => external}, deny = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>). acl_cache_basic(_) -> - SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>}, + SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external}, not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), @@ -162,7 +128,7 @@ acl_cache_basic(_) -> acl_cache_expiry(_) -> application:set_env(emqx, acl_cache_ttl, 100), - SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>}, + SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external}, allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), ct:sleep(150), @@ -172,7 +138,7 @@ acl_cache_expiry(_) -> acl_cache_full(_) -> application:set_env(emqx, acl_cache_max_size, 1), - SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>}, + SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external}, allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), @@ -187,7 +153,7 @@ acl_cache_cleanup(_) -> application:set_env(emqx, acl_cache_ttl, 100), application:set_env(emqx, acl_cache_max_size, 2), - SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>}, + SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external}, allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), @@ -357,8 +323,8 @@ compile_rule(_) -> {deny, all} = compile({deny, all}). match_rule(_) -> - User = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{127,0,0,1}, 2948}}, - User2 = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{192,168,0,10}, 3028}}, + User = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{127,0,0,1}, 2948}, zone => external}, + User2 = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{192,168,0,10}, 3028}, zone => external}, {matched, allow} = match(User, <<"Test/Topic">>, {allow, all}), {matched, deny} = match(User, <<"Test/Topic">>, {deny, all}), diff --git a/test/emqx_alarm_handler_SUITE.erl b/test/emqx_alarm_handler_SUITE.erl index 25a8303e6..1e7223320 100644 --- a/test/emqx_alarm_handler_SUITE.erl +++ b/test/emqx_alarm_handler_SUITE.erl @@ -62,8 +62,8 @@ t_alarm_handler(_) -> {ok, Data} = gen_tcp:recv(Sock, 0), {ok, ?CONNACK_PACKET(?RC_SUCCESS), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), - Topic1 = emqx_topic:systop(<<"alarms/alarm_for_test/alert">>), - Topic2 = emqx_topic:systop(<<"alarms/alarm_for_test/clear">>), + Topic1 = emqx_topic:systop(<<"alarms/alert">>), + Topic2 = emqx_topic:systop(<<"alarms/clear">>), SubOpts = #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0}, emqx_client_sock:send(Sock, raw_send_serialize( diff --git a/test/emqx_mqtt_packet_SUITE.erl b/test/emqx_mqtt_packet_SUITE.erl index 8ac76bfd9..e189ed69d 100644 --- a/test/emqx_mqtt_packet_SUITE.erl +++ b/test/emqx_mqtt_packet_SUITE.erl @@ -68,11 +68,11 @@ groups() -> [{connect, [sequence], ]}]. init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), + emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + emqx_ct_helpers:stop_apps([]). init_per_group(_Group, Config) -> Config. @@ -85,7 +85,7 @@ case1_protocol_name(_) -> MqttPacket = serialize(?CASE1_PROTOCOL_NAME), emqx_client_sock:send(Sock, MqttPacket), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), _} = raw_recv_pase(Data), + {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), <<>>, _} = raw_recv_pase(Data), Disconnect = gen_tcp:recv(Sock, 0), ?assertEqual({error, closed}, Disconnect). @@ -95,7 +95,7 @@ case2_protocol_ver(_) -> emqx_client_sock:send(Sock, Packet), {ok, Data} = gen_tcp:recv(Sock, 0), %% case1 Unacceptable protocol version - {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), _} = raw_recv_pase(Data), + {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), <<>>, _} = raw_recv_pase(Data), Disconnect = gen_tcp:recv(Sock, 0), ?assertEqual({error, closed}, Disconnect). diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 60d298008..ec707e30d 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -55,7 +55,7 @@ groups() -> init_per_suite(Config) -> emqx_ct_helpers:start_apps([], fun set_special_configs/1), - MqttCaps = emqx_zone:get_env(external, '$mqtt_caps'), + MqttCaps = maps:from_list(emqx_mqtt_caps:default_caps()), emqx_zone:set_env(external, '$mqtt_caps', MqttCaps#{max_topic_alias => 20}), Config. diff --git a/test/emqx_request_response_SUITE.erl b/test/emqx_request_response_SUITE.erl index 6709e958e..373619f1d 100644 --- a/test/emqx_request_response_SUITE.erl +++ b/test/emqx_request_response_SUITE.erl @@ -22,10 +22,11 @@ -include_lib("common_test/include/ct.hrl"). init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps([{log_level, error} | Config]). + emqx_ct_helpers:start_apps([]), + Config. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + emqx_ct_helpers:stop_apps([]). all() -> [request_response]. diff --git a/test/emqx_ws_channel_SUITE.erl b/test/emqx_ws_channel_SUITE.erl index 29f02e653..ada11a95a 100644 --- a/test/emqx_ws_channel_SUITE.erl +++ b/test/emqx_ws_channel_SUITE.erl @@ -31,6 +31,7 @@ all() -> [ t_ws_connect_api , t_ws_auth_failure + , t_ws_other_type_frame ]. init_per_suite(Config) -> @@ -71,6 +72,18 @@ t_ws_connect_api(_Config) -> {close, _} = rfc6455_client:close(WS), ok. +t_ws_other_type_frame(_Config) -> + WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), + {ok, _} = rfc6455_client:open(WS), + ok = rfc6455_client:send_binary(WS, raw_send_serialize(?CLIENT)), + {binary, Bin} = rfc6455_client:recv(WS), + Connack = ?CONNACK_PACKET(?CONNACK_ACCEPT), + {ok, Connack, <<>>, _} = raw_recv_pase(Bin), + rfc6455_client:send(WS, <<"testdata">>), + timer:sleep(1000), + ?assertEqual(undefined, erlang:process_info(WS)), + ok. + raw_send_serialize(Packet) -> emqx_frame:serialize(Packet). diff --git a/test/run_emqx.escript b/test/run_emqx.escript new file mode 100644 index 000000000..e8d1e2aae --- /dev/null +++ b/test/run_emqx.escript @@ -0,0 +1,13 @@ +#!/usr/bin/env escript + +main(_) -> + start(). + +start() -> + SpecEmqxConfig = fun(_) -> ok end, + start(SpecEmqxConfig). + +start(SpecEmqxConfig) -> + SchemaPath = filename:join(["priv", "emqx.schema"]), + ConfPath = filename:join(["etc", "emqx.conf"]), + emqx_ct_helpers:start_app(emqx, SchemaPath, ConfPath, SpecEmqxConfig).