diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index f334204cc..15f214e03 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -477,20 +477,13 @@ replay(ClientInfo, [], Session0 = #{s := S0}) -> -spec replay_batch(stream_state(), session(), clientinfo()) -> session(). replay_batch(Ifs0, Session, ClientInfo) -> - #ifs{ - batch_begin_key = BatchBeginMsgKey, - batch_size = BatchSize, - it_end = ItEnd - } = Ifs0, - %% TODO: retry - {ok, ItBegin} = emqx_ds:update_iterator(?PERSISTENT_MESSAGE_DB, ItEnd, BatchBeginMsgKey), - Ifs1 = Ifs0#ifs{it_end = ItBegin}, - {Ifs, Inflight} = enqueue_batch(true, BatchSize, Ifs1, Session, ClientInfo), + #ifs{batch_size = BatchSize} = Ifs0, + %% TODO: retry on errors: + {Ifs, Inflight} = enqueue_batch(true, BatchSize, Ifs0, Session, ClientInfo), %% Assert: - Ifs =:= Ifs1 orelse - ?SLOG(warning, #{ - msg => "replay_inconsistency", - expected => Ifs1, + Ifs =:= Ifs0 orelse + ?tp(warning, emqx_persistent_session_ds_replay_inconsistency, #{ + expected => Ifs0, got => Ifs }), Session#{inflight => Inflight}. @@ -645,7 +638,6 @@ new_batch({StreamKey, Ifs0}, BatchSize, Session = #{s := S0}, ClientInfo) -> first_seqno_qos1 = SN1, first_seqno_qos2 = SN2, batch_size = 0, - batch_begin_key = undefined, last_seqno_qos1 = SN1, last_seqno_qos2 = SN2 }, @@ -657,10 +649,16 @@ new_batch({StreamKey, Ifs0}, BatchSize, Session = #{s := S0}, ClientInfo) -> enqueue_batch(IsReplay, BatchSize, Ifs0, Session = #{inflight := Inflight0}, ClientInfo) -> #ifs{ - it_end = It0, + it_begin = ItBegin, + it_end = ItEnd, first_seqno_qos1 = FirstSeqnoQos1, first_seqno_qos2 = FirstSeqnoQos2 } = Ifs0, + It0 = + case IsReplay of + true -> ItBegin; + false -> ItEnd + end, case emqx_ds:next(?PERSISTENT_MESSAGE_DB, It0, BatchSize) of {ok, It, []} -> %% No new messages; just update the end iterator: @@ -668,13 +666,13 @@ enqueue_batch(IsReplay, BatchSize, Ifs0, Session = #{inflight := Inflight0}, Cli {ok, end_of_stream} -> %% No new messages; just update the end iterator: {Ifs0#ifs{it_end = end_of_stream}, Inflight0}; - {ok, It, [{BatchBeginMsgKey, _} | _] = Messages} -> + {ok, It, Messages} -> {Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch( IsReplay, Session, ClientInfo, FirstSeqnoQos1, FirstSeqnoQos2, Messages, Inflight0 ), Ifs = Ifs0#ifs{ + it_begin = It0, it_end = It, - batch_begin_key = BatchBeginMsgKey, %% TODO: it should be possible to avoid calling %% length here by diffing size of inflight before %% and after inserting messages: @@ -852,30 +850,30 @@ commit_seqno(Track, PacketId, Session = #{id := SessionId, s := S}) -> SeqNo = packet_id_to_seqno(PacketId, S), case Track of puback -> - Old = ?committed(?QOS_1), - Next = ?next(?QOS_1); + MinTrack = ?committed(?QOS_1), + MaxTrack = ?next(?QOS_1); pubrec -> - Old = ?dup(?QOS_2), - Next = ?next(?QOS_2); + MinTrack = ?dup(?QOS_2), + MaxTrack = ?next(?QOS_2); pubcomp -> - Old = ?committed(?QOS_2), - Next = ?next(?QOS_2) + MinTrack = ?committed(?QOS_2), + MaxTrack = ?next(?QOS_2) end, - NextSeqNo = emqx_persistent_session_ds_state:get_seqno(Next, S), - PrevSeqNo = emqx_persistent_session_ds_state:get_seqno(Old, S), - case PrevSeqNo =< SeqNo andalso SeqNo =< NextSeqNo of + Min = emqx_persistent_session_ds_state:get_seqno(MinTrack, S), + Max = emqx_persistent_session_ds_state:get_seqno(MaxTrack, S), + case Min =< SeqNo andalso SeqNo =< Max of true -> %% TODO: we pass a bogus message into the hook: Msg = emqx_message:make(SessionId, <<>>, <<>>), - {ok, Msg, Session#{s => emqx_persistent_session_ds_state:put_seqno(Old, SeqNo, S)}}; + {ok, Msg, Session#{s => emqx_persistent_session_ds_state:put_seqno(MinTrack, SeqNo, S)}}; false -> ?SLOG(warning, #{ msg => "out-of-order_commit", track => Track, packet_id => PacketId, - commit_seqno => SeqNo, - prev => PrevSeqNo, - next => NextSeqNo + seqno => SeqNo, + min => Min, + max => Max }), {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end. diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index e7500606b..d8556c8c9 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -42,16 +42,16 @@ -define(dup(QOS), {1, QOS}). %% Last seqno assigned to some message (that may reside in the %% mqueue): --define(next(QOS), {0, QOS}). +-define(next(QOS), {2, QOS}). %%%%% State of the stream: -record(ifs, { rank_x :: emqx_ds:rank_x(), rank_y :: emqx_ds:rank_y(), - %% Iterator at the end of the last batch: + %% Iterator at the beginning and end of the last batch: + it_begin :: emqx_ds:iterator() | undefined, it_end :: emqx_ds:iterator() | end_of_stream, %% Key that points at the beginning of the batch: - batch_begin_key :: binary() | undefined, batch_size = 0 :: non_neg_integer(), %% Session sequence number at the time when the batch was fetched: first_seqno_qos1 = 0 :: emqx_persistent_session_ds:seqno(), diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index cfe366e2e..8f7cb5ca0 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -66,14 +66,13 @@ %% It's implemented as three maps: `clean', `dirty' and `tombstones'. %% Updates are made to the `dirty' area. `pmap_commit' function saves %% the updated entries to Mnesia and moves them to the `clean' area. --record(pmap, {table, clean, dirty, tombstones}). +-record(pmap, {table, cache, dirty}). -type pmap(K, V) :: #pmap{ table :: atom(), - clean :: #{K => V}, - dirty :: #{K => V}, - tombstones :: #{K => _} + cache :: #{K => V}, + dirty :: #{K => dirty | del} }. %% Session metadata: @@ -409,70 +408,56 @@ pmap_open(Table, SessionId) -> Clean = maps:from_list(kv_bag_restore(Table, SessionId)), #pmap{ table = Table, - clean = Clean, - dirty = #{}, - tombstones = #{} + cache = Clean, + dirty = #{} }. -spec pmap_get(K, pmap(K, V)) -> V | undefined. -pmap_get(K, #pmap{dirty = Dirty, clean = Clean}) -> - case Dirty of - #{K := V} -> - V; - _ -> - case Clean of - #{K := V} -> V; - _ -> undefined - end - end. +pmap_get(K, #pmap{cache = Cache}) -> + maps:get(K, Cache, undefined). -spec pmap_put(K, V, pmap(K, V)) -> pmap(K, V). -pmap_put(K, V, Pmap = #pmap{dirty = Dirty, clean = Clean, tombstones = Tombstones}) -> +pmap_put(K, V, Pmap = #pmap{dirty = Dirty, cache = Cache}) -> Pmap#pmap{ - dirty = maps:put(K, V, Dirty), - clean = maps:remove(K, Clean), - tombstones = maps:remove(K, Tombstones) + cache = maps:put(K, V, Cache), + dirty = Dirty#{K => dirty} }. -spec pmap_del(K, pmap(K, V)) -> pmap(K, V). pmap_del( Key, - Pmap = #pmap{dirty = Dirty, clean = Clean, tombstones = Tombstones} + Pmap = #pmap{dirty = Dirty, cache = Cache} ) -> - %% Update the caches: Pmap#pmap{ - dirty = maps:remove(Key, Dirty), - clean = maps:remove(Key, Clean), - tombstones = Tombstones#{Key => del} + cache = maps:remove(Key, Cache), + dirty = Dirty#{Key => del} }. -spec pmap_fold(fun((K, V, A) -> A), A, pmap(K, V)) -> A. -pmap_fold(Fun, Acc0, #pmap{clean = Clean, dirty = Dirty}) -> - Acc1 = maps:fold(Fun, Acc0, Dirty), - maps:fold(Fun, Acc1, Clean). +pmap_fold(Fun, Acc, #pmap{cache = Cache}) -> + maps:fold(Fun, Acc, Cache). -spec pmap_commit(emqx_persistent_session_ds:id(), pmap(K, V)) -> pmap(K, V). pmap_commit( - SessionId, Pmap = #pmap{table = Tab, dirty = Dirty, clean = Clean, tombstones = Tombstones} + SessionId, Pmap = #pmap{table = Tab, dirty = Dirty, cache = Cache} ) -> - %% Commit deletions: - maps:foreach(fun(K, _) -> kv_bag_delete(Tab, SessionId, K) end, Tombstones), - %% Replace all records in the bag with the entries from the dirty area: maps:foreach( - fun(K, V) -> - kv_bag_persist(Tab, SessionId, K, V) + fun + (K, del) -> + kv_bag_delete(Tab, SessionId, K); + (K, dirty) -> + V = maps:get(K, Cache), + kv_bag_persist(Tab, SessionId, K, V) end, Dirty ), Pmap#pmap{ - dirty = #{}, - tombstones = #{}, - clean = maps:merge(Clean, Dirty) + dirty = #{} }. -spec pmap_format(pmap(_K, _V)) -> map(). -pmap_format(#pmap{clean = Clean, dirty = Dirty}) -> - maps:merge(Clean, Dirty). +pmap_format(#pmap{cache = Cache}) -> + Cache. %% Functions dealing with set tables: diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index d48d0af77..d572609e1 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -217,8 +217,8 @@ remove_fully_replayed_streams(S0) -> ). compare_streams( - #ifs{first_seqno_qos1 = A1, first_seqno_qos2 = A2}, - #ifs{first_seqno_qos1 = B1, first_seqno_qos2 = B2} + {_KeyA, #ifs{first_seqno_qos1 = A1, first_seqno_qos2 = A2}}, + {_KeyB, #ifs{first_seqno_qos1 = B1, first_seqno_qos2 = B2}} ) -> case A1 =:= B1 of true ->