%%-------------------------------------------------------------------- %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_rule_metrics). -behaviour(gen_server). -include("rule_engine.hrl"). %% API functions -export([ start_link/0 , stop/0 ]). -export([ get_actions_taken/1 , get_actions_success/1 , get_actions_error/1 , get_actions_exception/1 , get_actions_retry/1 , get_rules_matched/1 , get_rules_failed/1 , get_rules_passed/1 , get_rules_exception/1 , get_rules_no_result/1 ]). -export([ inc_rules_matched/2 , inc_actions_taken/1 , inc_actions_taken/2 , inc_actions_success/1 , inc_actions_success/2 , inc_actions_error/1 , inc_actions_error/2 , inc_actions_exception/1 , inc_actions_exception/2 , inc_actions_retry/1 , inc_actions_retry/2 , inc_rules_matched/1 , inc_rules_failed/1 , inc_rules_passed/1 , inc_rules_exception/1 , inc_rules_no_result/1 ]). -export([ inc/2 , inc/3 , get/2 , get_rule_speed/1 , create_rule_metrics/1 , create_metrics/1 , clear_rule_metrics/1 , clear_metrics/1 ]). -export([ get_rule_metrics/1 , get_action_metrics/1 ]). %% gen_server callbacks -export([ init/1 , handle_call/3 , handle_info/2 , handle_cast/2 , code_change/3 , terminate/2 ]). -ifndef(TEST). -define(SECS_5M, 300). -define(SAMPLING, 10). -else. %% Use 5 secs average speed instead of 5 mins in case of testing -define(SECS_5M, 5). -define(SAMPLING, 1). -endif. -define(CRefID(ID), {?MODULE, ID}). -define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)). -record(rule_speed, { max = 0 :: number(), current = 0 :: number(), last5m = 0 :: number(), %% metadata for calculating the avg speed tick = 1 :: number(), last_v = 0 :: number(), %% metadata for calculating the 5min avg speed last5m_acc = 0 :: number(), last5m_smpl = [] :: list() }). -record(state, { metric_ids = sets:new(), rule_speeds :: undefined | #{rule_id() => #rule_speed{}}, overall_rule_speed :: #rule_speed{} }). %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ -spec(create_rule_metrics(rule_id()) -> ok). create_rule_metrics(Id) -> gen_server:call(?MODULE, {create_rule_metrics, Id}). -spec(create_metrics(rule_id()) -> ok). create_metrics(Id) -> gen_server:call(?MODULE, {create_metrics, Id}). -spec(clear_rule_metrics(rule_id()) -> ok). clear_rule_metrics(Id) -> gen_server:call(?MODULE, {delete_rule_metrics, Id}). -spec(clear_metrics(rule_id()) -> ok). clear_metrics(Id) -> gen_server:call(?MODULE, {delete_metrics, Id}). -spec(get(rule_id(), atom()) -> number()). get(Id, Metric) -> case couters_ref(Id) of not_found -> 0; Ref -> counters:get(Ref, metrics_idx(Metric)) end. -spec(get_rule_speed(rule_id()) -> map()). get_rule_speed(Id) -> gen_server:call(?MODULE, {get_rule_speed, Id}). -spec(get_rule_metrics(rule_id()) -> map()). get_rule_metrics(Id) -> #{max := Max, current := Current, last5m := Last5M} = get_rule_speed(Id), #{matched => get_rules_matched(Id), failed => get_rules_failed(Id), passed => get_rules_passed(Id), exception => get_rules_exception(Id), no_result => get_rules_no_result(Id), speed => Current, speed_max => Max, speed_last5m => Last5M }. -spec(get_action_metrics(action_instance_id()) -> map()). get_action_metrics(Id) -> #{success => get_actions_success(Id), failed => get_actions_error(Id) + get_actions_exception(Id), taken => get_actions_taken(Id) }. -spec inc(rule_id(), atom()) -> ok. inc(Id, Metric) -> inc(Id, Metric, 1). -spec inc(rule_id(), atom(), pos_integer()) -> ok. inc(Id, Metric, Val) -> case couters_ref(Id) of not_found -> %% this may occur when increasing a counter for %% a rule that was created from a remove node. case atom_to_list(Metric) of "rules." ++ _ -> create_rule_metrics(Id); _ -> create_metrics(Id) end, counters:add(couters_ref(Id), metrics_idx(Metric), Val); Ref -> counters:add(Ref, metrics_idx(Metric), Val) end. inc_actions_taken(Id) -> inc_actions_taken(Id, 1). inc_actions_taken(Id, Val) -> inc(Id, 'actions.taken', Val). inc_actions_success(Id) -> inc_actions_success(Id, 1). inc_actions_success(Id, Val) -> inc(Id, 'actions.success', Val). inc_actions_error(Id) -> inc_actions_error(Id, 1). inc_actions_error(Id, Val) -> inc(Id, 'actions.error', Val). inc_actions_exception(Id) -> inc_actions_exception(Id, 1). inc_actions_exception(Id, Val) -> inc(Id, 'actions.exception', Val). inc_actions_retry(Id) -> inc_actions_retry(Id, 1). inc_actions_retry(Id, Val) -> inc(Id, 'actions.retry', Val). inc_rules_matched(Id) -> inc_rules_matched(Id, 1). inc_rules_matched(Id, Val) -> inc(Id, 'rules.matched', Val). inc_rules_failed(Id) -> inc_rules_failed(Id, 1). inc_rules_failed(Id, Val) -> inc(Id, 'rules.failed', Val). inc_rules_passed(Id) -> inc_rules_passed(Id, 1). inc_rules_passed(Id, Val) -> inc(Id, 'rules.passed', Val). inc_rules_exception(Id) -> inc_rules_exception(Id, 1). inc_rules_exception(Id, Val) -> inc(Id, 'rules.failed', Val), inc(Id, 'rules.exception', Val). inc_rules_no_result(Id) -> inc_rules_no_result(Id, 1). inc_rules_no_result(Id, Val) -> inc(Id, 'rules.failed', Val), inc(Id, 'rules.no_result', Val). get_actions_taken(Id) -> get(Id, 'actions.taken'). get_actions_success(Id) -> get(Id, 'actions.success'). get_actions_error(Id) -> get(Id, 'actions.error'). get_actions_exception(Id) -> get(Id, 'actions.exception'). get_actions_retry(Id) -> get(Id, 'actions.retry'). get_rules_matched(Id) -> get(Id, 'rules.matched'). get_rules_failed(Id) -> get(Id, 'rules.failed'). get_rules_passed(Id) -> get(Id, 'rules.passed'). get_rules_exception(Id) -> get(Id, 'rules.exception'). get_rules_no_result(Id) -> get(Id, 'rules.no_result'). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). init([]) -> erlang:process_flag(trap_exit, true), %% the speed metrics erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), {ok, #state{overall_rule_speed = #rule_speed{}}}. handle_call({get_rule_speed, _Id}, _From, State = #state{rule_speeds = undefined}) -> {reply, format_rule_speed(#rule_speed{}), State}; handle_call({get_rule_speed, Id}, _From, State = #state{rule_speeds = RuleSpeeds}) -> {reply, case maps:get(Id, RuleSpeeds, undefined) of undefined -> format_rule_speed(#rule_speed{}); Speed -> format_rule_speed(Speed) end, State}; handle_call({create_metrics, Id}, _From, State = #state{metric_ids = MIDs}) -> {reply, create_counters(Id), State#state{metric_ids = sets:add_element(Id, MIDs)}}; handle_call({create_rule_metrics, Id}, _From, State = #state{metric_ids = MIDs, rule_speeds = RuleSpeeds}) -> {reply, create_counters(Id), State#state{metric_ids = sets:add_element(Id, MIDs), rule_speeds = case RuleSpeeds of undefined -> #{Id => #rule_speed{}}; _ -> RuleSpeeds#{Id => #rule_speed{}} end}}; handle_call({delete_metrics, Id}, _From, State = #state{metric_ids = MIDs, rule_speeds = undefined}) -> {reply, delete_counters(Id), State#state{metric_ids = sets:del_element(Id, MIDs)}}; handle_call({delete_rule_metrics, Id}, _From, State = #state{metric_ids = MIDs, rule_speeds = RuleSpeeds}) -> {reply, delete_counters(Id), State#state{metric_ids = sets:del_element(Id, MIDs), rule_speeds = case RuleSpeeds of undefined -> undefined; _ -> maps:remove(Id, RuleSpeeds) end}}; handle_call(_Request, _From, State) -> {reply, ok, State}. handle_cast(_Msg, State) -> {noreply, State}. handle_info(ticking, State = #state{rule_speeds = undefined}) -> async_refresh_resource_status(), erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), {noreply, State}; handle_info(ticking, State = #state{rule_speeds = RuleSpeeds0}) -> RuleSpeeds = maps:map( fun(Id, RuleSpeed) -> calculate_speed(get_rules_matched(Id), RuleSpeed) end, RuleSpeeds0), async_refresh_resource_status(), erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), {noreply, State#state{rule_speeds = RuleSpeeds}}; handle_info(_Info, State) -> {noreply, State}. code_change({down, _Vsn}, State = #state{metric_ids = MIDs}, [Vsn]) -> case string:tokens(Vsn, ".") of ["4", "3", SVal] -> {Val, []} = string:to_integer(SVal), case Val =< 6 of true -> [begin Passed = get_rules_passed(Id), Take = get_actions_taken(Id), Success = get_actions_success(Id), Error = get_actions_error(Id), Exception = get_actions_exception(Id), Retry = get_actions_retry(Id), ok = delete_counters(Id), ok = create_counters(Id, 7), inc_rules_matched(Id, Passed), inc_actions_taken(Id, Take), inc_actions_success(Id, Success), inc_actions_error(Id, Error), inc_actions_exception(Id, Exception), inc_actions_retry(Id, Retry) end || Id <- sets:to_list(MIDs)], {ok, State}; false -> {ok, State} end; _ -> {ok, State} end; code_change(_Vsn, State = #state{metric_ids = MIDs}, [Vsn]) -> case string:tokens(Vsn, ".") of ["4", "3", SVal] -> {Val, []} = string:to_integer(SVal), case Val =< 6 of true -> [begin Matched = get_rules_matched(Id), Take = get_actions_taken(Id), Success = get_actions_success(Id), Error = get_actions_error(Id), Exception = get_actions_exception(Id), Retry = get_actions_retry(Id), ok = delete_counters(Id), ok = create_counters(Id), inc_rules_matched(Id, Matched), inc_rules_passed(Id, Matched), inc_actions_taken(Id, Take), inc_actions_success(Id, Success), inc_actions_error(Id, Error), inc_actions_exception(Id, Exception), inc_actions_retry(Id, Retry) end || Id <- sets:to_list(MIDs)], {ok, State}; false -> {ok, State} end; _ -> {ok, State} end; code_change(_OldVsn, State, _Extra) -> {ok, State}. terminate(_Reason, #state{metric_ids = MIDs}) -> [delete_counters(Id) || Id <- sets:to_list(MIDs)], persistent_term:erase(?MODULE). stop() -> gen_server:stop(?MODULE). %%------------------------------------------------------------------------------ %% Internal Functions %%------------------------------------------------------------------------------ async_refresh_resource_status() -> spawn(emqx_rule_engine, refresh_resource_status, []). create_counters(Id) -> create_counters(Id, max_counters_size()). create_counters(Id, Size) -> case couters_ref(Id) of not_found -> ok = persistent_term:put(?CRefID(Id), counters:new(Size, [write_concurrency])); _Ref -> ok end. delete_counters(Id) -> persistent_term:erase(?CRefID(Id)), ok. couters_ref(Id) -> try persistent_term:get(?CRefID(Id)) catch error:badarg -> not_found end. calculate_speed(_CurrVal, undefined) -> undefined; calculate_speed(CurrVal, #rule_speed{max = MaxSpeed0, last_v = LastVal, tick = Tick, last5m_acc = AccSpeed5Min0, last5m_smpl = Last5MinSamples0}) -> %% calculate the current speed based on the last value of the counter CurrSpeed = (CurrVal - LastVal) / ?SAMPLING, %% calculate the max speed since the emqx startup MaxSpeed = if MaxSpeed0 >= CurrSpeed -> MaxSpeed0; true -> CurrSpeed end, %% calculate the average speed in last 5 mins {Last5MinSamples, Acc5Min, Last5Min} = if Tick =< ?SAMPCOUNT_5M -> Acc = AccSpeed5Min0 + CurrSpeed, {lists:reverse([CurrSpeed | lists:reverse(Last5MinSamples0)]), Acc, Acc / Tick}; true -> [FirstSpeed | Speeds] = Last5MinSamples0, Acc = AccSpeed5Min0 + CurrSpeed - FirstSpeed, {lists:reverse([CurrSpeed | lists:reverse(Speeds)]), Acc, Acc / ?SAMPCOUNT_5M} end, #rule_speed{max = MaxSpeed, current = CurrSpeed, last5m = Last5Min, last_v = CurrVal, last5m_acc = Acc5Min, last5m_smpl = Last5MinSamples, tick = Tick + 1}. format_rule_speed(#rule_speed{max = Max, current = Current, last5m = Last5Min}) -> #{max => Max, current => precision(Current, 2), last5m => precision(Last5Min, 2)}. precision(Float, N) -> Base = math:pow(10, N), round(Float * Base) / Base. %%------------------------------------------------------------------------------ %% Metrics Definitions %%------------------------------------------------------------------------------ max_counters_size() -> 11. metrics_idx('rules.matched') -> 1; metrics_idx('actions.success') -> 2; metrics_idx('actions.error') -> 3; metrics_idx('actions.taken') -> 4; metrics_idx('actions.exception') -> 5; metrics_idx('actions.retry') -> 6; metrics_idx('rules.failed') -> 7; metrics_idx('rules.passed') -> 8; metrics_idx('rules.exception') -> 9; metrics_idx('rules.no_result') -> 10; metrics_idx(_) -> 11.