From bcae452e42ec12382f36ccff80ec4ef9b0c0ad9e Mon Sep 17 00:00:00 2001 From: GilbertWong Date: Sat, 15 Jun 2019 17:07:46 +0800 Subject: [PATCH 1/3] Fix the flapping bug Prior to this change, the banned until value will not be set correctly because of wrong spell of config entry name . This change fix this bug --- rebar.config | 6 +++--- src/emqx_cm_sup.erl | 3 +-- src/emqx_flapping.erl | 13 ++++++++----- src/emqx_protocol.erl | 2 +- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/rebar.config b/rebar.config index 807c3cab1..bd017de97 100644 --- a/rebar.config +++ b/rebar.config @@ -2,10 +2,10 @@ [ {jsx, "2.9.0"} % hex , {cowboy, "2.6.1"} % hex , {gproc, "0.8.0"} % hex + , {ekka, "0.5.6"} % hex + , {replayq, "0.1.1"} %hex + , {esockd, "5.5.0"} %hex , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.5"}}} - , {replayq, {git, "https://github.com/emqx/replayq", {tag, "v0.1.1"}}} - , {esockd, "5.5.0"} , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} ]}. diff --git a/src/emqx_cm_sup.erl b/src/emqx_cm_sup.erl index 19940da05..e054c53e7 100644 --- a/src/emqx_cm_sup.erl +++ b/src/emqx_cm_sup.erl @@ -30,9 +30,8 @@ init([]) -> shutdown => 1000, type => worker, modules => [emqx_banned]}, - FlappingOption = emqx_config:get_env(flapping_clean_interval, 3600000), Flapping = #{id => flapping, - start => {emqx_flapping, start_link, [FlappingOption]}, + start => {emqx_flapping, start_link, []}, restart => permanent, shutdown => 1000, type => worker, diff --git a/src/emqx_flapping.erl b/src/emqx_flapping.erl index 099bf3910..2a5e66be1 100644 --- a/src/emqx_flapping.erl +++ b/src/emqx_flapping.erl @@ -19,7 +19,7 @@ -behaviour(gen_statem). --export([start_link/1]). +-export([start_link/0]). %% This module is used to garbage clean the flapping records @@ -33,6 +33,8 @@ -define(FLAPPING_TAB, ?MODULE). +-define(default_flapping_clean_interval, 3600000). + -export([check/3]). -record(flapping, @@ -96,11 +98,12 @@ check_flapping(Action, CheckCount, _Threshold = {TimesThreshold, TimeInterval}, %%-------------------------------------------------------------------- %% gen_statem callbacks %%-------------------------------------------------------------------- --spec(start_link(TimerInterval :: [integer()]) -> startlink_ret()). -start_link(TimerInterval) -> - gen_statem:start_link({local, ?MODULE}, ?MODULE, [TimerInterval], []). +-spec(start_link() -> startlink_ret()). +start_link() -> + gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []). -init([TimerInterval]) -> +init([]) -> + TimerInterval = emqx_config:get_env(flapping_clean_interval, ?default_flapping_clean_interval), TabOpts = [ public , set , {keypos, 2} diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 6eb1bb233..7d1ecc04e 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -963,7 +963,7 @@ do_flapping_detect(Action, #pstate{zone = Zone, Threshold = emqx_zone:get_env(Zone, flapping_threshold, {10, 60}), case emqx_flapping:check(Action, ClientId, Threshold) of flapping -> - BanExpiryInterval = emqx_zone:get_env(Zone, flapping_ban_expiry_interval, 3600000), + BanExpiryInterval = emqx_zone:get_env(Zone, flapping_banned_expiry_interval, 3600000), Until = erlang:system_time(second) + BanExpiryInterval, emqx_banned:add(#banned{who = {client_id, ClientId}, reason = <<"flapping">>, From 1d23d7de86d40e0e38df3ce0a118262bf1557578 Mon Sep 17 00:00:00 2001 From: Gilbert Date: Thu, 20 Jun 2019 11:44:44 +0800 Subject: [PATCH 2/3] Optimize test workflow (#2642) * Optimize test suites workflow * Better format of makefile * Fix protocol suite * Add emqx_access_SUITE --- Makefile | 17 ++++----- src/emqx_mqtt_caps.erl | 6 ++++ test/emqx_access_SUITE.erl | 54 ++++++---------------------- test/emqx_mqtt_packet_SUITE.erl | 8 ++--- test/emqx_protocol_SUITE.erl | 2 +- test/emqx_request_response_SUITE.erl | 5 +-- 6 files changed, 31 insertions(+), 61 deletions(-) diff --git a/Makefile b/Makefile index 771fb9d96..14790e0e9 100644 --- a/Makefile +++ b/Makefile @@ -3,20 +3,17 @@ REBAR_GIT_CLONE_OPTIONS += --depth 1 export REBAR_GIT_CLONE_OPTIONS -# CT_SUITES = emqx_trie emqx_router emqx_frame emqx_mqtt_compat -CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ - emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \ - emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \ - emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ - emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ - emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ - emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_channel \ - emqx_packet emqx_channel emqx_tracer emqx_sys_mon emqx_message emqx_os_mon \ - emqx_vm_mon emqx_alarm_handler emqx_rpc emqx_flapping +SUITES_FILES := $(shell find test -name '*_SUITE.erl') + +CT_SUITES := $(foreach value,$(SUITES_FILES),$(shell val=$$(basename $(value) .erl); echo $${val%_*})) CT_NODE_NAME = emqxct@127.0.0.1 +.PHONY: cover +run: + @echo $(CT_TEST_SUITES) + compile: @rebar3 compile diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index ff38f687e..bde3e3eb5 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -24,6 +24,8 @@ , get_caps/2 ]). +-export([default_caps/0]). + -type(caps() :: #{max_packet_size => integer(), max_clientid_len => integer(), max_topic_alias => integer(), @@ -36,6 +38,7 @@ -export_type([caps/0]). -define(UNLIMITED, 0). + -define(DEFAULT_CAPS, [{max_packet_size, ?MAX_PACKET_SIZE}, {max_clientid_len, ?MAX_CLIENTID_LEN}, {max_topic_alias, ?UNLIMITED}, @@ -119,6 +122,9 @@ check_sub(Topic, Opts, [{max_topic_levels, Limit}|Caps]) -> _ -> check_sub(Topic, Opts, Caps) end. +default_caps() -> + ?DEFAULT_CAPS. + get_caps(Zone, publish) -> with_env(Zone, '$mqtt_pub_caps', fun() -> diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index d57094925..9be887ce3 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -36,8 +36,6 @@ all() -> groups() -> [{access_control, [sequence], [reload_acl, - register_mod, - unregister_mod, check_acl_1, check_acl_2]}, {access_control_cache_mode, [], @@ -98,58 +96,26 @@ write_config(Filename, Terms) -> end_per_group(_Group, Config) -> Config. -init_per_testcase(_TestCase, Config) -> - ?AC:start_link(), - Config. -end_per_testcase(_TestCase, _Config) -> - ok. - -per_testcase_config(acl_cache_full, Config) -> - Config; -per_testcase_config(_TestCase, Config) -> - Config. - %%-------------------------------------------------------------------- %% emqx_access_control %%-------------------------------------------------------------------- reload_acl(_) -> - [ok] = ?AC:reload_acl(). - -register_mod(_) -> - ok = ?AC:register_mod(acl, emqx_acl_test_mod, []), - {emqx_acl_test_mod, _, 0} = hd(?AC:lookup_mods(acl)), - ok = ?AC:register_mod(auth, emqx_auth_anonymous_test_mod,[]), - ok = ?AC:register_mod(auth, emqx_auth_dashboard, [], 99), - [{emqx_auth_dashboard, _, 99}, - {emqx_auth_anonymous_test_mod, _, 0}] = ?AC:lookup_mods(auth). - -unregister_mod(_) -> - ok = ?AC:register_mod(acl, emqx_acl_test_mod, []), - {emqx_acl_test_mod, _, 0} = hd(?AC:lookup_mods(acl)), - ok = ?AC:unregister_mod(acl, emqx_acl_test_mod), - timer:sleep(5), - {emqx_acl_internal, _, 0}= hd(?AC:lookup_mods(acl)), - ok = ?AC:register_mod(auth, emqx_auth_anonymous_test_mod,[]), - [{emqx_auth_anonymous_test_mod, _, 0}] = ?AC:lookup_mods(auth), - - ok = ?AC:unregister_mod(auth, emqx_auth_anonymous_test_mod), - timer:sleep(5), - [] = ?AC:lookup_mods(auth). + ok = ?AC:reload_acl(). check_acl_1(_) -> - SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>}, + SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external}, allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), - deny = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1/x/y">>), + deny = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1/x/y">>), allow = ?AC:check_acl(SelfUser, publish, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>). check_acl_2(_) -> - SelfUser = #{client_id => <<"client2">>, username => <<"xyz">>}, + SelfUser = #{client_id => <<"client2">>, username => <<"xyz">>, zone => external}, deny = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>). acl_cache_basic(_) -> - SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>}, + SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external}, not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), @@ -162,7 +128,7 @@ acl_cache_basic(_) -> acl_cache_expiry(_) -> application:set_env(emqx, acl_cache_ttl, 100), - SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>}, + SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external}, allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), ct:sleep(150), @@ -172,7 +138,7 @@ acl_cache_expiry(_) -> acl_cache_full(_) -> application:set_env(emqx, acl_cache_max_size, 1), - SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>}, + SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external}, allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), @@ -187,7 +153,7 @@ acl_cache_cleanup(_) -> application:set_env(emqx, acl_cache_ttl, 100), application:set_env(emqx, acl_cache_max_size, 2), - SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>}, + SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external}, allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>), allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>), @@ -357,8 +323,8 @@ compile_rule(_) -> {deny, all} = compile({deny, all}). match_rule(_) -> - User = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{127,0,0,1}, 2948}}, - User2 = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{192,168,0,10}, 3028}}, + User = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{127,0,0,1}, 2948}, zone => external}, + User2 = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{192,168,0,10}, 3028}, zone => external}, {matched, allow} = match(User, <<"Test/Topic">>, {allow, all}), {matched, deny} = match(User, <<"Test/Topic">>, {deny, all}), diff --git a/test/emqx_mqtt_packet_SUITE.erl b/test/emqx_mqtt_packet_SUITE.erl index 8ac76bfd9..e189ed69d 100644 --- a/test/emqx_mqtt_packet_SUITE.erl +++ b/test/emqx_mqtt_packet_SUITE.erl @@ -68,11 +68,11 @@ groups() -> [{connect, [sequence], ]}]. init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), + emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + emqx_ct_helpers:stop_apps([]). init_per_group(_Group, Config) -> Config. @@ -85,7 +85,7 @@ case1_protocol_name(_) -> MqttPacket = serialize(?CASE1_PROTOCOL_NAME), emqx_client_sock:send(Sock, MqttPacket), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), _} = raw_recv_pase(Data), + {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), <<>>, _} = raw_recv_pase(Data), Disconnect = gen_tcp:recv(Sock, 0), ?assertEqual({error, closed}, Disconnect). @@ -95,7 +95,7 @@ case2_protocol_ver(_) -> emqx_client_sock:send(Sock, Packet), {ok, Data} = gen_tcp:recv(Sock, 0), %% case1 Unacceptable protocol version - {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), _} = raw_recv_pase(Data), + {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), <<>>, _} = raw_recv_pase(Data), Disconnect = gen_tcp:recv(Sock, 0), ?assertEqual({error, closed}, Disconnect). diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 60d298008..ec707e30d 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -55,7 +55,7 @@ groups() -> init_per_suite(Config) -> emqx_ct_helpers:start_apps([], fun set_special_configs/1), - MqttCaps = emqx_zone:get_env(external, '$mqtt_caps'), + MqttCaps = maps:from_list(emqx_mqtt_caps:default_caps()), emqx_zone:set_env(external, '$mqtt_caps', MqttCaps#{max_topic_alias => 20}), Config. diff --git a/test/emqx_request_response_SUITE.erl b/test/emqx_request_response_SUITE.erl index 6709e958e..373619f1d 100644 --- a/test/emqx_request_response_SUITE.erl +++ b/test/emqx_request_response_SUITE.erl @@ -22,10 +22,11 @@ -include_lib("common_test/include/ct.hrl"). init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps([{log_level, error} | Config]). + emqx_ct_helpers:start_apps([]), + Config. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + emqx_ct_helpers:stop_apps([]). all() -> [request_response]. From 138d7727faa6392ade07d31a0c14c0a5c6765074 Mon Sep 17 00:00:00 2001 From: Gilbert Date: Thu, 20 Jun 2019 11:45:44 +0800 Subject: [PATCH 3/3] Compact windows platform which has no cpu_sup util function. (#2629) * Compact windows platform which has no cpu_sup util function. * Fix wrong os type clause * Do not start timer when platform is windows --- src/emqx_os_mon.erl | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/emqx_os_mon.erl b/src/emqx_os_mon.erl index 2b58a3749..21624c079 100644 --- a/src/emqx_os_mon.erl +++ b/src/emqx_os_mon.erl @@ -47,6 +47,11 @@ -define(OS_MON, ?MODULE). +-define(compat_windows(Expression), case os:type() of + {win32, nt} -> windows; + _Unix -> Expression + end). + %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ @@ -95,7 +100,7 @@ set_procmem_high_watermark(Float) -> %%------------------------------------------------------------------------------ init([Opts]) -> - _ = cpu_sup:util(), + _ = ?compat_windows(cpu_sup:util()), set_mem_check_interval(proplists:get_value(mem_check_interval, Opts, 60)), set_sysmem_high_watermark(proplists:get_value(sysmem_high_watermark, Opts, 0.70)), set_procmem_high_watermark(proplists:get_value(procmem_high_watermark, Opts, 0.05)), @@ -126,16 +131,18 @@ handle_call(_Request, _From, State) -> handle_cast(_Request, State) -> {noreply, State}. -handle_info({timeout, Timer, check}, State = #{timer := Timer, +handle_info({timeout, Timer, check}, State = #{timer := Timer, cpu_high_watermark := CPUHighWatermark, cpu_low_watermark := CPULowWatermark, is_cpu_alarm_set := IsCPUAlarmSet}) -> - case cpu_sup:util() of + case ?compat_windows(cpu_sup:util()) of 0 -> {noreply, State#{timer := undefined}}; {error, Reason} -> ?LOG(error, "Failed to get cpu utilization: ~p", [Reason]), {noreply, ensure_check_timer(State)}; + windows -> + {noreply, State}; Busy when Busy / 100 >= CPUHighWatermark -> alarm_handler:set_alarm({cpu_high_watermark, Busy}), {noreply, ensure_check_timer(State#{is_cpu_alarm_set := true})}; @@ -163,4 +170,3 @@ call(Req) -> ensure_check_timer(State = #{cpu_check_interval := Interval}) -> State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}. -