diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 8f5eb3646..a7be853bf 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -55,6 +55,7 @@ -export([ validate_heap_size/1 , parse_user_lookup_fun/1 , validate_alarm_actions/1 + , validations/0 ]). % workaround: prevent being recognized as unused functions @@ -1593,6 +1594,29 @@ validate_tls_versions(Versions) -> Vs -> {error, {unsupported_ssl_versions, Vs}} end. +validations() -> + [{check_process_watermark, fun check_process_watermark/1} + ,{check_cpu_watermark, fun check_cpu_watermark/1} + ]. + +%% validations from emqx_conf_schema, we must filter other *_schema by undefined. +check_process_watermark(Conf) -> + check_watermark("sysmon.vm.process_low_watermark", "sysmon.vm.process_high_watermark", Conf). + +check_cpu_watermark(Conf) -> + check_watermark("sysmon.os.cpu_low_watermark", "sysmon.os.cpu_high_watermark", Conf). + +check_watermark(LowKey, HighKey, Conf) -> + case hocon_maps:get(LowKey, Conf) of + undefined -> true; + Low -> + High = hocon_maps:get(HighKey, Conf), + case Low < High of + true -> true; + false -> {bad_watermark, #{LowKey => Low, HighKey => High}} + end + end. + str(A) when is_atom(A) -> atom_to_list(A); str(B) when is_binary(B) -> diff --git a/apps/emqx_authz/src/emqx_authz_schema.erl b/apps/emqx_authz/src/emqx_authz_schema.erl index 779cc52ae..b5bee052f 100644 --- a/apps/emqx_authz/src/emqx_authz_schema.erl +++ b/apps/emqx_authz/src/emqx_authz_schema.erl @@ -205,27 +205,37 @@ transform_header_name(Headers) -> maps:put(K, V, Acc) end, #{}, Headers). -check_ssl_opts(Conf) - when Conf =:= #{} -> - true; check_ssl_opts(Conf) -> - case emqx_authz_http:parse_url(hocon_maps:get("config.url", Conf)) of - #{scheme := https} -> - case hocon_maps:get("config.ssl.enable", Conf) of - true -> ok; - false -> false - end; - #{scheme := http} -> - ok + case hocon_maps:get("config.url", Conf) of + undefined -> true; + Url -> + case emqx_authz_http:parse_url(Url) of + #{scheme := https} -> + case hocon_maps:get("config.ssl.enable", Conf) of + true -> true; + _ -> {error, ssl_not_enable} + end; + #{scheme := http} -> true; + Bad -> {bad_scheme, Url, Bad} + end end. -check_headers(Conf) - when Conf =:= #{} -> - true; check_headers(Conf) -> - Method = to_bin(hocon_maps:get("config.method", Conf)), - Headers = hocon_maps:get("config.headers", Conf), - Method =:= <<"post">> orelse (not lists:member(<<"content-type">>, Headers)). + case hocon_maps:get("config.method", Conf) of + undefined -> true; + Method0 -> + Method = to_bin(Method0), + Headers = hocon_maps:get("config.headers", Conf), + case Method of + <<"post">> -> true; + _ when Headers =:= undefined -> true; + _ when is_list(Headers) -> + case lists:member(<<"content-type">>, Headers) of + false -> true; + true -> {Method0, do_not_include_content_type} + end + end + end. union_array(Item) when is_list(Item) -> hoconsc:array(hoconsc:union(Item)). diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index d15e10136..aaf1966bd 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -36,7 +36,7 @@ file/0, cipher/0]). --export([namespace/0, roots/0, fields/1, translations/0, translation/1]). +-export([namespace/0, roots/0, fields/1, translations/0, translation/1, validations/0]). -export([conf_get/2, conf_get/3, keys/2, filter/1]). %% Static apps which merge their configs into the merged emqx.conf @@ -103,6 +103,10 @@ roots() -> emqx_schema:roots(low) ++ lists:flatmap(fun roots/1, ?MERGED_CONFIGS). +validations() -> + hocon_schema:validations(emqx_schema) ++ + lists:flatmap(fun hocon_schema:validations/1, ?MERGED_CONFIGS). + fields("cluster") -> [ {"name", sc(atom(), diff --git a/lib-ee/emqx_license/etc/emqx_license.conf b/lib-ee/emqx_license/etc/emqx_license.conf index 331d98aa8..6c5ad217b 100644 --- a/lib-ee/emqx_license/etc/emqx_license.conf +++ b/lib-ee/emqx_license/etc/emqx_license.conf @@ -1,3 +1,5 @@ license { key = "MjIwMTExCjAKMTAKRXZhbHVhdGlvbgpjb250YWN0QGVtcXguaW8KMjAyMjAxMDEKMzY1MDAKMTAK.MEUCIFc9EUjqB3SjpRqWjqmAzI4Tg4LwhCRet9scEoxMRt8fAiEAk6vfYUiPOTzBC+3EjNF3WmLTiA3B0TN5ZNwuTKbTXJQ=" + connection_low_watermark = 75%, + connection_high_watermark = 80% } diff --git a/lib-ee/emqx_license/include/emqx_license.hrl b/lib-ee/emqx_license/include/emqx_license.hrl index 93a8c8cec..7c530c4b8 100644 --- a/lib-ee/emqx_license/include/emqx_license.hrl +++ b/lib-ee/emqx_license/include/emqx_license.hrl @@ -21,7 +21,7 @@ "======================================================\n" "Your license has expired.\n" "Please visit https://emqx.com/apply-licenses/emqx or\n" - "contact our customer services for an updated license.\n" + "contact customer services.\n" "======================================================\n" ). diff --git a/lib-ee/emqx_license/src/emqx_license.erl b/lib-ee/emqx_license/src/emqx_license.erl index 412a1a1d3..b4698aa0f 100644 --- a/lib-ee/emqx_license/src/emqx_license.erl +++ b/lib-ee/emqx_license/src/emqx_license.erl @@ -112,12 +112,12 @@ del_license_hook() -> _ = emqx_hooks:del('client.connect', {?MODULE, check, []}), ok. -do_update({file, Filename}, _Conf) -> +do_update({file, Filename}, Conf) -> case file:read_file(Filename) of {ok, Content} -> case emqx_license_parser:parse(Content) of {ok, _License} -> - #{<<"file">> => Filename}; + maps:remove(<<"key">>, Conf#{<<"file">> => Filename}); {error, Reason} -> erlang:throw(Reason) end; @@ -125,13 +125,16 @@ do_update({file, Filename}, _Conf) -> erlang:throw({invalid_license_file, Reason}) end; -do_update({key, Content}, _Conf) when is_binary(Content); is_list(Content) -> +do_update({key, Content}, Conf) when is_binary(Content); is_list(Content) -> case emqx_license_parser:parse(Content) of {ok, _License} -> - #{<<"key">> => Content}; + maps:remove(<<"file">>, Conf#{<<"key">> => Content}); {error, Reason} -> erlang:throw(Reason) - end. + end; +%% We don't do extra action when update license's watermark. +do_update(_Other, Conf) -> + Conf. check_max_clients_exceeded(MaxClients) -> emqx_license_resources:connection_count() > MaxClients * 1.1. diff --git a/lib-ee/emqx_license/src/emqx_license_checker.erl b/lib-ee/emqx_license/src/emqx_license_checker.erl index 5571eaada..8007ef3ce 100644 --- a/lib-ee/emqx_license/src/emqx_license_checker.erl +++ b/lib-ee/emqx_license/src/emqx_license_checker.erl @@ -10,6 +10,7 @@ -behaviour(gen_server). -define(CHECK_INTERVAL, 5000). +-define(EXPIRY_ALARM_CHECK_INTERVAL, 24 * 60* 60). -export([start_link/1, start_link/2, @@ -70,16 +71,18 @@ purge() -> init([LicenseFetcher, CheckInterval]) -> case LicenseFetcher() of {ok, License} -> - ?LICENSE_TAB = ets:new(?LICENSE_TAB, [set, protected, named_table]), + ?LICENSE_TAB = ets:new(?LICENSE_TAB, [set, protected, named_table, {read_concurrency, true}]), #{} = check_license(License), - State = ensure_timer(#{check_license_interval => CheckInterval, + State0 = ensure_check_license_timer(#{check_license_interval => CheckInterval, license => License}), + State = ensure_check_expiry_timer(State0), {ok, State}; - {error, _} = Error -> - Error + {error, Reason} -> + {stop, Reason} end. handle_call({update, License}, _From, State) -> + _ = expiry_early_alarm(License), {reply, check_license(License), State#{license => License}}; handle_call(dump, _From, #{license := License} = State) -> {reply, emqx_license_parser:dump(License), State}; @@ -94,10 +97,15 @@ handle_cast(_Msg, State) -> handle_info(check_license, #{license := License} = State) -> #{} = check_license(License), - NewState = ensure_timer(State), + NewState = ensure_check_license_timer(State), ?tp(debug, emqx_license_checked, #{}), {noreply, NewState}; +handle_info(check_expiry_alarm, #{license := License} = State) -> + _ = expiry_early_alarm(License), + NewState = ensure_check_expiry_timer(State), + {noreply, NewState}; + handle_info(_Msg, State) -> {noreply, State}. @@ -105,15 +113,25 @@ handle_info(_Msg, State) -> %% Private functions %%------------------------------------------------------------------------------ -ensure_timer(#{check_license_interval := CheckInterval} = State) -> - _ = case State of - #{timer := Timer} -> erlang:cancel_timer(Timer); - _ -> ok - end, +ensure_check_license_timer(#{check_license_interval := CheckInterval} = State) -> + cancel_timer(State, timer), State#{timer => erlang:send_after(CheckInterval, self(), check_license)}. +ensure_check_expiry_timer(State) -> + cancel_timer(State, expiry_alarm_timer), + Ref = erlang:send_after(?EXPIRY_ALARM_CHECK_INTERVAL, self(), check_expiry_alarm), + State#{expiry_alarm_timer => Ref}. + +cancel_timer(State, Key) -> + _ = case maps:find(Key, State) of + {ok, Ref} when is_reference(Ref) -> erlang:cancel_timer(Ref); + _ -> ok + end, + ok. + check_license(License) -> - NeedRestrict = need_restrict(License), + DaysLeft = days_left(License), + NeedRestrict = need_restrict(License, DaysLeft), Limits = limits(License, NeedRestrict), true = apply_limits(Limits), #{warn_evaluation => warn_evaluation(License, NeedRestrict), @@ -133,17 +151,25 @@ days_left(License) -> {DateNow, _} = calendar:universal_time(), calendar:date_to_gregorian_days(DateEnd) - calendar:date_to_gregorian_days(DateNow). -need_restrict(License)-> - DaysLeft = days_left(License), +need_restrict(License, DaysLeft)-> CType = emqx_license_parser:customer_type(License), Type = emqx_license_parser:license_type(License), DaysLeft < 0 - andalso (Type =/= ?OFFICIAL) or small_customer_overexpired(CType, DaysLeft). + andalso (Type =/= ?OFFICIAL) orelse small_customer_over_expired(CType, DaysLeft). -small_customer_overexpired(?SMALL_CUSTOMER, DaysLeft) +small_customer_over_expired(?SMALL_CUSTOMER, DaysLeft) when DaysLeft < ?EXPIRED_DAY -> true; -small_customer_overexpired(_CType, _DaysLeft) -> false. +small_customer_over_expired(_CType, _DaysLeft) -> false. apply_limits(Limits) -> ets:insert(?LICENSE_TAB, {limits, Limits}). + +expiry_early_alarm(License) -> + case days_left(License) < 30 of + true -> + DateEnd = emqx_license_parser:expiry_date(License), + catch emqx_alarm:activate(license_expiry, #{expiry_at => DateEnd}); + false -> + catch emqx_alarm:deactivate(license_expiry) + end. diff --git a/lib-ee/emqx_license/src/emqx_license_resources.erl b/lib-ee/emqx_license/src/emqx_license_resources.erl index 33d661b1b..79bbc113b 100644 --- a/lib-ee/emqx_license/src/emqx_license_resources.erl +++ b/lib-ee/emqx_license/src/emqx_license_resources.erl @@ -60,6 +60,7 @@ handle_cast(_Msg, State) -> handle_info(update_resources, State) -> true = update_resources(), + connection_quota_early_alarm(), ?tp(debug, emqx_license_resources_updated, #{}), {noreply, ensure_timer(State)}. @@ -72,6 +73,24 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ %% Private functions %%------------------------------------------------------------------------------ +connection_quota_early_alarm() -> + connection_quota_early_alarm(emqx_license_checker:limits()). + +connection_quota_early_alarm({ok, #{max_connections := Max}}) when is_integer(Max) -> + Count = connection_count(), + Low = emqx_conf:get([license, connection_low_watermark], 0.75), + High = emqx_conf:get([license, connection_high_watermark], 0.80), + if + Count > Max * High -> + HighPercent = float_to_binary(High * 100, [{decimals, 0}]), + Message = iolist_to_binary(["License: live connection number exceeds ", HighPercent, "%"]), + catch emqx_alarm:activate(license_quota, #{high_watermark => HighPercent}, Message); + Count < Max * Low -> + catch emqx_alarm:deactivate(license_quota); + true -> + ok + end; +connection_quota_early_alarm(_Limits) -> ok. cached_remote_connection_count() -> try ets:lookup(?MODULE, remote_connection_count) of diff --git a/lib-ee/emqx_license/src/emqx_license_schema.erl b/lib-ee/emqx_license/src/emqx_license_schema.erl index bd4a471f4..e35235777 100644 --- a/lib-ee/emqx_license/src/emqx_license_schema.erl +++ b/lib-ee/emqx_license/src/emqx_license_schema.erl @@ -12,20 +12,46 @@ -behaviour(hocon_schema). --export([roots/0, fields/1]). +-export([roots/0, fields/1, validations/0]). -roots() -> [{license, hoconsc:union( - [hoconsc:ref(?MODULE, key_license), - hoconsc:ref(?MODULE, file_license)])}]. +roots() -> [{license, + hoconsc:mk(hoconsc:union([hoconsc:ref(?MODULE, key_license), + hoconsc:ref(?MODULE, file_license)]), + #{desc => "TODO"})} + ]. fields(key_license) -> [ {key, #{type => string(), sensitive => true, %% so it's not logged desc => "Configure the license as a string" }} - ]; + | common_fields()]; fields(file_license) -> [ {file, #{type => string(), desc => "Path to the license file" }} + | common_fields()]. + +common_fields() -> + [ + {connection_low_watermark, #{type => emqx_schema:percent(), + default => "75%", desc => "" + }}, + {connection_high_watermark, #{type => emqx_schema:percent(), + default => "80%", desc => "" + }} ]. + +validations() -> + [ {check_license_watermark, fun check_license_watermark/1}]. + +check_license_watermark(Conf) -> + case hocon_maps:get("license.connection_low_watermark", Conf) of + undefined -> true; + Low -> + High = hocon_maps:get("license.connection_high_watermark", Conf), + case High =/= undefined andalso High > Low of + true -> true; + false -> {bad_license_watermark, #{high => High, low => Low}} + end + end.