emqx/apps/emqx_retainer/src/emqx_retainer_mnesia.erl

635 lines
19 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_retainer_mnesia).
-behaviour(emqx_retainer).
-include("emqx_retainer.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-include_lib("stdlib/include/qlc.hrl").
%% emqx_retainer callbacks
-export([
delete_message/2,
store_retained/2,
read_message/2,
page_read/4,
match_messages/3,
clear_expired/1,
clean/1,
size/1
]).
%% Internal exports (RPC)
-export([
do_store_retained/1,
do_clear_expired/0,
do_delete_message/1,
do_populate_index_meta/1,
do_reindex_batch/2
]).
%% Management API:
-export([topics/0]).
-export([create_resource/1]).
-export([reindex/2, reindex_status/0]).
-ifdef(TEST).
-export([populate_index_meta/0]).
-export([reindex/3]).
-endif.
-record(retained_message, {topic, msg, expiry_time}).
-record(retained_index, {key, expiry_time}).
-record(retained_index_meta, {key, read_indices, write_indices, reindexing, extra}).
-define(META_KEY, index_meta).
-define(CLEAR_BATCH_SIZE, 1000).
-define(REINDEX_BATCH_SIZE, 1000).
-define(REINDEX_DISPATCH_WAIT, 30000).
%%--------------------------------------------------------------------
%% Management API
%%--------------------------------------------------------------------
topics() ->
[emqx_topic:join(I) || I <- mnesia:dirty_all_keys(?TAB_MESSAGE)].
%%--------------------------------------------------------------------
%% emqx_retainer callbacks
%%--------------------------------------------------------------------
create_resource(#{storage_type := StorageType}) ->
ok = create_table(
?TAB_INDEX_META,
retained_index_meta,
record_info(fields, retained_index_meta),
set,
StorageType
),
ok = populate_index_meta(),
ok = create_table(
?TAB_MESSAGE,
retained_message,
record_info(fields, retained_message),
ordered_set,
StorageType
),
ok = create_table(
?TAB_INDEX,
retained_index,
record_info(fields, retained_index),
ordered_set,
StorageType
).
create_table(Table, RecordName, Attributes, Type, StorageType) ->
Copies =
case StorageType of
ram -> ram_copies;
disc -> disc_copies
end,
StoreProps = [
{ets, [
compressed,
{read_concurrency, true},
{write_concurrency, true}
]},
{dets, [{auto_save, 1000}]}
],
ok = mria:create_table(Table, [
{type, Type},
{rlog_shard, ?RETAINER_SHARD},
{storage, Copies},
{record_name, RecordName},
{attributes, Attributes},
{storage_properties, StoreProps}
]),
ok = mria_rlog:wait_for_shards([?RETAINER_SHARD], infinity),
case mnesia:table_info(Table, storage_type) of
Copies ->
ok;
_Other ->
{atomic, ok} = mnesia:change_table_copy_type(Table, node(), Copies),
ok
end.
store_retained(_, Msg = #message{topic = Topic}) ->
case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_store_retained/1, [Msg]) of
{atomic, ok} ->
?tp(debug, message_retained, #{topic => Topic}),
ok;
{aborted, Reason} ->
?SLOG(error, #{
msg => "failed_to_retain_message",
topic => Topic,
reason => Reason
})
end.
do_store_retained(#message{topic = Topic} = Msg) ->
ExpiryTime = emqx_retainer:get_expiry_time(Msg),
Tokens = topic_to_tokens(Topic),
case is_table_full() of
false ->
store_retained(db_indices(write), Msg, Tokens, ExpiryTime);
_ ->
case mnesia:read(?TAB_MESSAGE, Tokens, write) of
[_] ->
store_retained(db_indices(write), Msg, Tokens, ExpiryTime);
[] ->
mnesia:abort(table_is_full)
end
end.
clear_expired(_) ->
{atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear_expired/0),
ok.
do_clear_expired() ->
NowMs = erlang:system_time(millisecond),
QH = qlc:q([
TopicTokens
|| #retained_message{
topic = TopicTokens,
expiry_time = ExpiryTime
} <- mnesia:table(?TAB_MESSAGE, [{lock, write}]),
(ExpiryTime =/= 0) and (ExpiryTime < NowMs)
]),
QC = qlc:cursor(QH),
clear_batch(db_indices(write), QC).
delete_message(_, Topic) ->
{atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_delete_message/1, [Topic]),
ok.
do_delete_message(Topic) ->
Tokens = topic_to_tokens(Topic),
case emqx_topic:wildcard(Topic) of
false ->
ok = delete_message_by_topic(Tokens, db_indices(write));
true ->
QH = topic_search_table(Tokens),
qlc:fold(
fun(TopicTokens, _) ->
ok = delete_message_by_topic(TopicTokens, db_indices(write))
end,
undefined,
QH
)
end.
read_message(_, Topic) ->
{ok, read_messages(Topic)}.
match_messages(_, Topic, undefined) ->
Tokens = topic_to_tokens(Topic),
Now = erlang:system_time(millisecond),
QH = search_table(Tokens, Now),
case batch_read_number() of
all_remaining ->
{ok, qlc:eval(QH), undefined};
BatchNum when is_integer(BatchNum) ->
Cursor = qlc:cursor(QH),
match_messages(undefined, Topic, {Cursor, BatchNum})
end;
match_messages(_, _Topic, {Cursor, BatchNum}) ->
case qlc_next_answers(Cursor, BatchNum) of
{closed, Rows} ->
{ok, Rows, undefined};
{more, Rows} ->
{ok, Rows, {Cursor, BatchNum}}
end.
page_read(_, Topic, Page, Limit) ->
Now = erlang:system_time(millisecond),
QH =
case Topic of
undefined ->
search_table(undefined, ['#'], Now);
_ ->
Tokens = topic_to_tokens(Topic),
search_table(Tokens, Now)
end,
OrderedQH = qlc:sort(QH, {order, fun compare_message/2}),
Cursor = qlc:cursor(OrderedQH),
NSkip = (Page - 1) * Limit,
SkipResult =
case NSkip > 0 of
true ->
{Result, _} = qlc_next_answers(Cursor, NSkip),
Result;
false ->
more
end,
PageRows =
case SkipResult of
closed ->
[];
more ->
case qlc_next_answers(Cursor, Limit) of
{closed, Rows} ->
Rows;
{more, Rows} ->
qlc:delete_cursor(Cursor),
Rows
end
end,
{ok, PageRows}.
clean(_) ->
_ = mria:clear_table(?TAB_MESSAGE),
_ = mria:clear_table(?TAB_INDEX),
ok.
size(_) ->
table_size().
reindex(Force, StatusFun) ->
reindex(config_indices(), Force, StatusFun).
reindex_status() ->
case mnesia:dirty_read(?TAB_INDEX_META, ?META_KEY) of
[#retained_index_meta{reindexing = true}] ->
true;
_ ->
false
end.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
store_retained(Indices, Msg, Tokens, ExpiryTime) ->
ok = store_retained_message(Msg, Tokens, ExpiryTime),
ok = emqx_retainer_index:foreach_index_key(
fun(Key) -> store_retained_index(Key, ExpiryTime) end,
Indices,
Tokens
).
store_retained_message(Msg, Tokens, ExpiryTime) ->
RetainedMessage = #retained_message{
topic = Tokens,
msg = Msg,
expiry_time = ExpiryTime
},
mnesia:write(?TAB_MESSAGE, RetainedMessage, write).
store_retained_index(Key, ExpiryTime) ->
RetainedIndex = #retained_index{
key = Key,
expiry_time = ExpiryTime
},
mnesia:write(?TAB_INDEX, RetainedIndex, write).
topic_search_table(Tokens) ->
Index = emqx_retainer_index:select_index(Tokens, db_indices(read)),
topic_search_table(Index, Tokens).
topic_search_table(undefined, Tokens) ->
Cond = emqx_retainer_index:condition(Tokens),
Ms = [{#retained_message{topic = Cond, msg = '_', expiry_time = '_'}, [], ['$_']}],
MsgQH = mnesia:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]),
qlc:q([Topic || #retained_message{topic = Topic} <- MsgQH]);
topic_search_table(Index, Tokens) ->
Cond = emqx_retainer_index:condition(Index, Tokens),
Ms = [{#retained_index{key = Cond, expiry_time = '_'}, [], ['$_']}],
IndexQH = mnesia:table(?TAB_INDEX, [{traverse, {select, Ms}}]),
qlc:q([
emqx_retainer_index:restore_topic(Key)
|| #retained_index{key = Key} <- IndexQH
]).
search_table(Tokens, Now) ->
Indices = dirty_read_indices(),
Index = emqx_retainer_index:select_index(Tokens, Indices),
search_table(Index, Tokens, Now).
search_table(undefined, Tokens, Now) ->
Ms = make_message_match_spec(Tokens, Now),
ets:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]);
search_table(Index, Tokens, Now) ->
Ms = make_index_match_spec(Index, Tokens, Now),
Topics = [
emqx_retainer_index:restore_topic(Key)
|| #retained_index{key = Key} <- ets:select(?TAB_INDEX, Ms)
],
RetainedMsgQH = qlc:q([
ets:lookup(?TAB_MESSAGE, TopicTokens)
|| TopicTokens <- Topics
]),
qlc:q([
Msg
|| [
#retained_message{
msg = Msg,
expiry_time = ExpiryTime
}
] <- RetainedMsgQH,
(ExpiryTime == 0) or (ExpiryTime > Now)
]).
dirty_read_indices() ->
case ets:lookup(?TAB_INDEX_META, ?META_KEY) of
[#retained_index_meta{read_indices = ReadIndices}] -> ReadIndices;
[] -> []
end.
clear_batch(Indices, QC) ->
{Result, Rows} = qlc_next_answers(QC, ?CLEAR_BATCH_SIZE),
lists:foreach(
fun(TopicTokens) -> delete_message_by_topic(TopicTokens, Indices) end,
Rows
),
case Result of
closed -> ok;
more -> clear_batch(Indices, QC)
end.
delete_message_by_topic(TopicTokens, Indices) ->
ok = emqx_retainer_index:foreach_index_key(
fun(Key) ->
mnesia:delete({?TAB_INDEX, Key})
end,
Indices,
TopicTokens
),
ok = mnesia:delete({?TAB_MESSAGE, TopicTokens}).
compare_message(M1, M2) ->
M1#message.timestamp =< M2#message.timestamp.
topic_to_tokens(Topic) ->
emqx_topic:words(Topic).
-spec read_messages(emqx_types:topic()) ->
[emqx_types:message()].
read_messages(Topic) ->
Tokens = topic_to_tokens(Topic),
case mnesia:dirty_read(?TAB_MESSAGE, Tokens) of
[] ->
[];
[#retained_message{msg = Msg, expiry_time = Et}] ->
case Et =:= 0 orelse Et >= erlang:system_time(millisecond) of
true -> [Msg];
false -> []
end
end.
qlc_next_answers(QC, N) ->
case qlc:next_answers(QC, N) of
NextAnswers when
is_list(NextAnswers) andalso
length(NextAnswers) < N
->
qlc:delete_cursor(QC),
{closed, NextAnswers};
NextAnswers when is_list(NextAnswers) ->
{more, NextAnswers};
{error, Module, Reason} ->
qlc:delete_cursor(QC),
error({qlc_error, Module, Reason})
end.
make_message_match_spec(Tokens, NowMs) ->
Cond = emqx_retainer_index:condition(Tokens),
MsHd = #retained_message{topic = Cond, msg = '$2', expiry_time = '$3'},
[{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$2']}].
make_index_match_spec(Index, Tokens, NowMs) ->
Cond = emqx_retainer_index:condition(Index, Tokens),
MsHd = #retained_index{key = Cond, expiry_time = '$3'},
[{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}].
-spec is_table_full() -> boolean().
is_table_full() ->
Limit = emqx:get_config([retainer, backend, max_retained_messages]),
Limit > 0 andalso (table_size() >= Limit).
-spec table_size() -> non_neg_integer().
table_size() ->
mnesia:table_info(?TAB_MESSAGE, size).
config_indices() ->
lists:sort(emqx_config:get([retainer, backend, index_specs])).
populate_index_meta() ->
ConfigIndices = config_indices(),
case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_populate_index_meta/1, [ConfigIndices]) of
{atomic, ok} ->
ok;
{atomic, {error, DBWriteIndices, DBReadIndices}} ->
?SLOG(warning, #{
msg => "emqx_retainer_outdated_indices",
config_indices => ConfigIndices,
db_write_indices => DBWriteIndices,
db_read_indices => DBReadIndices
}),
ok;
{aborted, Reason} ->
?SLOG(error, #{
msg => "failed_to_populate_emqx_retainer_indices",
reason => Reason
}),
{error, Reason}
end.
do_populate_index_meta(ConfigIndices) ->
case mnesia:read(?TAB_INDEX_META, ?META_KEY, write) of
[
#retained_index_meta{
read_indices = ReadIndices,
write_indices = WriteIndices,
reindexing = Reindexing
}
] ->
case {ReadIndices, WriteIndices, Reindexing} of
{_, _, true} ->
ok;
{ConfigIndices, ConfigIndices, false} ->
ok;
{DBWriteIndices, DBReadIndices, false} ->
{error, DBWriteIndices, DBReadIndices}
end;
[] ->
mnesia:write(
?TAB_INDEX_META,
#retained_index_meta{
key = ?META_KEY,
read_indices = ConfigIndices,
write_indices = ConfigIndices,
reindexing = false
},
write
)
end.
db_indices(Type) ->
case mnesia:read(?TAB_INDEX_META, ?META_KEY) of
[#retained_index_meta{read_indices = ReadIndices, write_indices = WriteIndices}] ->
case Type of
read -> ReadIndices;
write -> WriteIndices
end;
[] ->
[]
end.
batch_read_number() ->
case emqx:get_config([retainer, flow_control, batch_read_number]) of
0 -> all_remaining;
BatchNum when is_integer(BatchNum) -> BatchNum
end.
reindex(NewIndices, Force, StatusFun) when
is_boolean(Force) andalso is_function(StatusFun, 1)
->
%% Disable read indices and update write indices so that new records are written
%% with correct indices. Also block parallel reindexing.
case try_start_reindex(NewIndices, Force) of
{atomic, ok} ->
%% Wait for all dispatch operations to be completed to avoid
%% inconsistent results.
true = wait_dispatch_complete(?REINDEX_DISPATCH_WAIT),
%% All new dispatch operations will see
%% indices disabled, so we feel free to clear index table.
%% Clear old index records.
{atomic, ok} = mria:clear_table(?TAB_INDEX),
%% Fill index records in batches.
QH = qlc:q([Topic || #retained_message{topic = Topic} <- ets:table(?TAB_MESSAGE)]),
ok = reindex_batch(qlc:cursor(QH), 0, StatusFun),
%% Enable read indices and unlock reindexing.
finalize_reindex();
{atomic, Reason} ->
Reason
end.
try_start_reindex(NewIndices, true) ->
%% Note: we don't expect reindexing during upgrade, so this function is internal
mria:transaction(
?RETAINER_SHARD,
fun() -> start_reindex(NewIndices) end
);
try_start_reindex(NewIndices, false) ->
mria:transaction(
?RETAINER_SHARD,
fun() ->
case mnesia:read(?TAB_INDEX_META, ?META_KEY, write) of
[#retained_index_meta{reindexing = false}] ->
start_reindex(NewIndices);
_ ->
{error, already_started}
end
end
).
start_reindex(NewIndices) ->
?tp(warning, retainer_message_reindexing_started, #{
hint => <<"during reindexing, subscription performance may degrade">>
}),
mnesia:write(
?TAB_INDEX_META,
#retained_index_meta{
key = ?META_KEY,
read_indices = [],
write_indices = NewIndices,
reindexing = true
},
write
).
finalize_reindex() ->
%% Note: we don't expect reindexing during upgrade, so this function is internal
{atomic, ok} = mria:transaction(
?RETAINER_SHARD,
fun() ->
case mnesia:read(?TAB_INDEX_META, ?META_KEY, write) of
[#retained_index_meta{write_indices = WriteIndices} = Meta] ->
mnesia:write(
?TAB_INDEX_META,
Meta#retained_index_meta{
key = ?META_KEY,
read_indices = WriteIndices,
reindexing = false
},
write
);
[] ->
ok
end
end
),
?tp(warning, retainer_message_reindexing_finished, #{}),
ok.
reindex_topic(Indices, Topic) ->
case mnesia:read(?TAB_MESSAGE, Topic, read) of
[#retained_message{expiry_time = ExpiryTime}] ->
ok = emqx_retainer_index:foreach_index_key(
fun(Key) -> store_retained_index(Key, ExpiryTime) end,
Indices,
Topic
);
[] ->
ok
end.
reindex_batch(QC, Done, StatusFun) ->
case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_reindex_batch/2, [QC, Done]) of
{atomic, {more, NewDone}} ->
_ = StatusFun(NewDone),
reindex_batch(QC, NewDone, StatusFun);
{atomic, {closed, NewDone}} ->
_ = StatusFun(NewDone),
ok;
{aborted, Reason} ->
?SLOG(error, #{
msg => "failed_to_reindex_retained_messages",
reason => Reason
}),
{error, Reason}
end.
do_reindex_batch(QC, Done) ->
Indices = db_indices(write),
{Status, Topics} = qlc_next_answers(QC, ?REINDEX_BATCH_SIZE),
ok = lists:foreach(
fun(Topic) -> reindex_topic(Indices, Topic) end,
Topics
),
{Status, Done + length(Topics)}.
wait_dispatch_complete(Timeout) ->
Nodes = mria_mnesia:running_nodes(),
{Results, []} = emqx_retainer_proto_v1:wait_dispatch_complete(Nodes, Timeout),
lists:all(
fun(Result) -> Result =:= ok end,
Results
).