From 39857626ce831c6e8c9a5f79a96119d49c1ef15a Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 12 Jan 2024 02:06:32 +0100 Subject: [PATCH] test(sessds): Fix failing tests --- apps/emqx/src/emqx_persistent_session_ds.erl | 28 +++++--- apps/emqx/src/emqx_persistent_session_ds.hrl | 21 +++--- .../src/emqx_persistent_session_ds_state.erl | 1 + .../test/emqx_persistent_session_SUITE.erl | 69 ++++++++++--------- 4 files changed, 64 insertions(+), 55 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index d8019b6f1..7d4ab71d6 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -234,7 +234,7 @@ info(mqueue_dropped, _Session) -> % info(awaiting_rel, #sessmem{awaiting_rel = AwaitingRel}) -> % AwaitingRel; info(awaiting_rel_cnt, #{s := S}) -> - seqno_diff(?QOS_2, ?dup(?QOS_2), ?committed(?QOS_2), S); + seqno_diff(?QOS_2, ?rec, ?committed(?QOS_2), S); info(awaiting_rel_max, #{props := Conf}) -> maps:get(max_awaiting_rel, Conf); info(await_rel_timeout, #{props := Conf}) -> @@ -602,6 +602,7 @@ session_ensure_new(Id, ConnInfo, Conf) -> ?committed(?QOS_1), ?next(?QOS_2), ?dup(?QOS_2), + ?rec, ?committed(?QOS_2) ] ), @@ -742,6 +743,7 @@ process_batch( Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S), Dup1 = emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_1), S), Dup2 = emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_2), S), + Rec = emqx_persistent_session_ds_state:get_seqno(?rec, S), Subs = emqx_persistent_session_ds_state:get_subscriptions(S), Msgs = [ Msg @@ -784,11 +786,18 @@ process_batch( ?QOS_2 when SeqNoQos2 =< Comm2 -> %% QoS2 message has been PUBCOMP'ed by the client, ignore: Acc; - ?QOS_2 when SeqNoQos2 =< Dup2 -> + ?QOS_2 when SeqNoQos2 =< Rec -> %% QoS2 message has been PUBREC'ed by the client, resend PUBREL: emqx_persistent_session_ds_inflight:push({pubrel, SeqNoQos2}, Acc); + ?QOS_2 when SeqNoQos2 =< Dup2 -> + %% QoS2 message has been sent, but we haven't received PUBREC. + %% + %% TODO: According to the MQTT standard 4.3.3: + %% DUP flag is never set for QoS2 messages? We + %% do so for mem sessions, though. + Msg1 = emqx_message:set_flag(dup, true, Msg), + emqx_persistent_session_ds_inflight:push({SeqNoQos2, Msg1}, Acc); ?QOS_2 -> - %% MQTT standard 4.3.3: DUP flag is never set for QoS2 messages: emqx_persistent_session_ds_inflight:push({SeqNoQos2, Msg}, Acc) end, SeqNoQos1, @@ -821,13 +830,10 @@ do_drain_buffer(Inflight0, S0, Acc) -> case Msg#message.qos of ?QOS_0 -> do_drain_buffer(Inflight, S0, [{undefined, Msg} | Acc]); - ?QOS_1 -> - S = emqx_persistent_session_ds_state:put_seqno(?dup(?QOS_1), SeqNo, S0), - Publish = {seqno_to_packet_id(?QOS_1, SeqNo), Msg}, - do_drain_buffer(Inflight, S, [Publish | Acc]); - ?QOS_2 -> - Publish = {seqno_to_packet_id(?QOS_2, SeqNo), Msg}, - do_drain_buffer(Inflight, S0, [Publish | Acc]) + Qos -> + S = emqx_persistent_session_ds_state:put_seqno(?dup(Qos), SeqNo, S0), + Publish = {seqno_to_packet_id(Qos, SeqNo), Msg}, + do_drain_buffer(Inflight, S, [Publish | Acc]) end end. @@ -898,7 +904,7 @@ commit_seqno(Track, PacketId, Session = #{id := SessionId, s := S}) -> MinTrack = ?committed(?QOS_1), MaxTrack = ?next(?QOS_1); pubrec -> - MinTrack = ?dup(?QOS_2), + MinTrack = ?rec, MaxTrack = ?next(?QOS_2); pubcomp -> MinTrack = ?committed(?QOS_2), diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 2d47052ca..6ab2d4c1f 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -28,25 +28,22 @@ %%%%% Session sequence numbers: %% -%% -----|----------|----------|------> seqno -%% | | | -%% committed dup next +%% -----|----------|-----|-----|------> seqno +%% | | | | +%% committed dup rec next +% (Qos2) %% Seqno becomes committed after receiving PUBACK for QoS1 or PUBCOMP %% for QoS2. -define(committed(QOS), QOS). -%% Seqno becomes dup: +%% Seqno becomes dup after broker sends QoS1 or QoS2 message to the +%% client. Upon session reconnect, messages with seqno in the +%% committed..dup range are retransmitted with DUP flag. %% -%% 1. After broker sends QoS1 message to the client. Upon session -%% reconnect, QoS1 messages with seqno in the committed..dup range are -%% retransmitted with DUP flag. -%% -%% 2. After it receives PUBREC from the client for the QoS2 message. -%% Upon session reconnect, PUBREL messages for QoS2 messages with -%% seqno in committed..dup are retransmitted. -define(dup(QOS), (10 + QOS)). +-define(rec, 22). %% Last seqno assigned to a message. --define(next(QOS), (20 + QOS)). +-define(next(QOS), (30 + QOS)). %%%%% State of the stream: -record(ifs, { diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index 6e03a1c32..fbd4fcc22 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -101,6 +101,7 @@ | ?committed(?QOS_1) | ?next(?QOS_2) | ?dup(?QOS_2) + | ?rec | ?committed(?QOS_2). -opaque t() :: #{ diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 007b737c2..008fc177c 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -36,7 +36,7 @@ all() -> % NOTE % Tests are disabled while existing session persistence impl is being % phased out. - %%{group, persistence_disabled}, + {group, persistence_disabled}, {group, persistence_enabled} ]. @@ -54,10 +54,9 @@ all() -> groups() -> TCs = emqx_common_test_helpers:all(?MODULE), TCsNonGeneric = [t_choose_impl], - % {group, quic}, {group, ws}], - TCGroups = [{group, tcp}], + TCGroups = [{group, tcp}, {group, quic}, {group, ws}], [ - %% {persistence_disabled, TCGroups}, + {persistence_disabled, TCGroups}, {persistence_enabled, TCGroups}, {tcp, [], TCs}, {quic, [], TCs -- TCsNonGeneric}, @@ -677,6 +676,7 @@ t_publish_many_while_client_is_gone_qos1(Config) -> ), NAcked = 4, + ?assert(NMsgs1 >= NAcked), [ok = emqtt:puback(Client1, PktId) || #{packet_id := PktId} <- lists:sublist(Msgs1, NAcked)], %% Ensure that PUBACKs are propagated to the channel. @@ -690,17 +690,18 @@ t_publish_many_while_client_is_gone_qos1(Config) -> #mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M9">>, qos = 1}, #mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M10">>, qos = 1}, #mqtt_msg{topic = <<"msg/feed/friend">>, payload = <<"M11">>, qos = 1}, - #mqtt_msg{topic = <<"msg/feed/me2">>, payload = <<"M12">>, qos = 1} + #mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M12">>, qos = 1} ], ok = publish_many(Pubs2), NPubs2 = length(Pubs2), + %% Now reconnect with auto ack to make sure all streams are + %% replayed till the end: {ok, Client2} = emqtt:start_link([ {proto_ver, v5}, {clientid, ClientId}, {properties, #{'Session-Expiry-Interval' => 30}}, - {clean_start, false}, - {auto_ack, false} + {clean_start, false} | Config ]), @@ -717,9 +718,9 @@ t_publish_many_while_client_is_gone_qos1(Config) -> ct:pal("Msgs2 = ~p", [Msgs2]), ?assert(NMsgs2 < NPubs, {NMsgs2, '<', NPubs}), - %% ?assert(NMsgs2 > NPubs2, {NMsgs2, '>', NPubs2}), - %% ?assert(NMsgs2 >= NPubs - NAcked, Msgs2), - NSame = max(0, NMsgs2 - NPubs2), + ?assert(NMsgs2 > NPubs2, {NMsgs2, '>', NPubs2}), + ?assert(NMsgs2 >= NPubs - NAcked, Msgs2), + NSame = NMsgs2 - NPubs2, ?assert( lists:all(fun(#{dup := Dup}) -> Dup end, lists:sublist(Msgs2, NSame)) ), @@ -780,6 +781,11 @@ t_publish_many_while_client_is_gone(Config) -> %% for its subscriptions after the client dies or reconnects, in addition %% to PUBRELs for the messages it has PUBRECed. While client must send %% PUBACKs and PUBRECs in order, those orders are independent of each other. + %% + %% Developer's note: for simplicity we publish all messages to the + %% same topic, since persistent session ds may reorder messages + %% that belong to different streams, and this particular test is + %% very sensitive the order. ClientId = ?config(client_id, Config), ConnFun = ?config(conn_fun, Config), ClientOpts = [ @@ -792,20 +798,18 @@ t_publish_many_while_client_is_gone(Config) -> {ok, Client1} = emqtt:start_link([{clean_start, true} | ClientOpts]), {ok, _} = emqtt:ConnFun(Client1), - {ok, _, [?QOS_1]} = emqtt:subscribe(Client1, <<"t/+/foo">>, ?QOS_1), - {ok, _, [?QOS_2]} = emqtt:subscribe(Client1, <<"msg/feed/#">>, ?QOS_2), - {ok, _, [?QOS_2]} = emqtt:subscribe(Client1, <<"loc/+/+/+">>, ?QOS_2), + {ok, _, [?QOS_2]} = emqtt:subscribe(Client1, <<"t">>, ?QOS_2), Pubs1 = [ - #mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M1">>, qos = 1}, - #mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M2">>, qos = 1}, - #mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M3">>, qos = 2}, - #mqtt_msg{topic = <<"loc/1/2/42">>, payload = <<"M4">>, qos = 2}, - #mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M5">>, qos = 2}, - #mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M6">>, qos = 1}, - #mqtt_msg{topic = <<"loc/3/4/5">>, payload = <<"M7">>, qos = 2}, - #mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M8">>, qos = 1}, - #mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M9">>, qos = 2} + #mqtt_msg{topic = <<"t">>, payload = <<"M1">>, qos = 1}, + #mqtt_msg{topic = <<"t">>, payload = <<"M2">>, qos = 1}, + #mqtt_msg{topic = <<"t">>, payload = <<"M3">>, qos = 2}, + #mqtt_msg{topic = <<"t">>, payload = <<"M4">>, qos = 2}, + #mqtt_msg{topic = <<"t">>, payload = <<"M5">>, qos = 2}, + #mqtt_msg{topic = <<"t">>, payload = <<"M6">>, qos = 1}, + #mqtt_msg{topic = <<"t">>, payload = <<"M7">>, qos = 2}, + #mqtt_msg{topic = <<"t">>, payload = <<"M8">>, qos = 1}, + #mqtt_msg{topic = <<"t">>, payload = <<"M9">>, qos = 2} ], ok = publish_many(Pubs1), NPubs1 = length(Pubs1), @@ -827,7 +831,7 @@ t_publish_many_while_client_is_gone(Config) -> [PktId || #{qos := 1, packet_id := PktId} <- Msgs1] ), - %% PUBREC first `NRecs` QoS 2 messages. + %% PUBREC first `NRecs` QoS 2 messages (up to "M5") NRecs = 3, PubRecs1 = lists:sublist([PktId || #{qos := 2, packet_id := PktId} <- Msgs1], NRecs), lists:foreach( @@ -851,9 +855,9 @@ t_publish_many_while_client_is_gone(Config) -> maybe_kill_connection_process(ClientId, Config), Pubs2 = [ - #mqtt_msg{topic = <<"loc/3/4/5">>, payload = <<"M10">>, qos = 2}, - #mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M11">>, qos = 1}, - #mqtt_msg{topic = <<"msg/feed/friend">>, payload = <<"M12">>, qos = 2} + #mqtt_msg{topic = <<"t">>, payload = <<"M10">>, qos = 2}, + #mqtt_msg{topic = <<"t">>, payload = <<"M11">>, qos = 1}, + #mqtt_msg{topic = <<"t">>, payload = <<"M12">>, qos = 2} ], ok = publish_many(Pubs2), NPubs2 = length(Pubs2), @@ -886,8 +890,8 @@ t_publish_many_while_client_is_gone(Config) -> Msgs2Dups ), - %% Now complete all yet incomplete QoS 2 message flows instead. - PubRecs2 = [PktId || #{qos := 2, packet_id := PktId} <- Msgs2], + %% Ack more messages: + PubRecs2 = lists:sublist([PktId || #{qos := 2, packet_id := PktId} <- Msgs2], 2), lists:foreach( fun(PktId) -> ok = emqtt:pubrec(Client2, PktId) end, PubRecs2 @@ -903,6 +907,7 @@ t_publish_many_while_client_is_gone(Config) -> %% PUBCOMP every PUBREL. PubComps = [PktId || {pubrel, #{packet_id := PktId}} <- PubRels1 ++ PubRels2], + ct:pal("PubComps: ~p", [PubComps]), lists:foreach( fun(PktId) -> ok = emqtt:pubcomp(Client2, PktId) end, PubComps @@ -910,19 +915,19 @@ t_publish_many_while_client_is_gone(Config) -> %% Ensure that PUBCOMPs are propagated to the channel. pong = emqtt:ping(Client2), - + %% Reconnect for the last time ok = disconnect_client(Client2), maybe_kill_connection_process(ClientId, Config), {ok, Client3} = emqtt:start_link([{clean_start, false} | ClientOpts]), {ok, _} = emqtt:ConnFun(Client3), - %% Only the last unacked QoS 1 message should be retransmitted. + %% Check that the messages are retransmitted with DUP=1: Msgs3 = receive_messages(NPubs, _Timeout = 2000), ct:pal("Msgs3 = ~p", [Msgs3]), ?assertMatch( - [#{topic := <<"t/100/foo">>, payload := <<"M11">>, qos := 1, dup := true}], - Msgs3 + [<<"M10">>, <<"M11">>, <<"M12">>], + [I || #{payload := I} <- Msgs3] ), ok = disconnect_client(Client3).