From b4bbfad415b8e4a555748f610d45e84fe4096e4e Mon Sep 17 00:00:00 2001 From: tigercl Date: Thu, 22 Aug 2019 16:12:14 +0800 Subject: [PATCH] Fix will retain checking (#2820) Fix will retain checking and handle the retained flag correctly --- src/emqx_packet.erl | 5 +++-- src/emqx_protocol.erl | 8 ++++---- src/emqx_session.erl | 14 ++++++++------ 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 19d93933a..da075e436 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -162,10 +162,11 @@ will_msg(#mqtt_packet_connect{client_id = ClientId, will_qos = QoS, will_topic = Topic, will_props = Properties, - will_payload = Payload}) -> + will_payload = Payload, + proto_ver = ProtoVer}) -> Msg = emqx_message:make(ClientId, QoS, Topic, Payload), Msg#message{flags = #{dup => false, retain => Retain}, - headers = merge_props(#{username => Username}, Properties)}. + headers = merge_props(#{username => Username, proto_ver => ProtoVer}, Properties)}. merge_props(Headers, undefined) -> Headers; diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 6ecea634e..e4fe32874 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -562,10 +562,10 @@ connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer, credentials = Creden %%------------------------------------------------------------------------------ do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId), - PState = #pstate{session = SPid, credentials = Credentials}) -> + PState = #pstate{session = SPid, credentials = Credentials, proto_ver = ProtoVer}) -> Msg = emqx_mountpoint:mount(mountpoint(Credentials), emqx_packet:to_message(Credentials, Packet)), - puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, emqx_message:set_flag(dup, false, Msg)), PState). + puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, emqx_message:set_flag(dup, false, emqx_message:set_header(proto_ver, ProtoVer, Msg))), PState). %%------------------------------------------------------------------------------ %% Puback -> Client @@ -834,8 +834,8 @@ check_will_retain(#mqtt_packet_connect{will_retain = false, proto_ver = ?MQTT_PR ok; check_will_retain(#mqtt_packet_connect{will_retain = true, proto_ver = ?MQTT_PROTO_V5}, #pstate{zone = Zone}) -> case emqx_zone:get_env(Zone, mqtt_retain_available, true) of - true -> {error, ?RC_RETAIN_NOT_SUPPORTED}; - false -> ok + true -> ok; + false -> {error, ?RC_RETAIN_NOT_SUPPORTED} end; check_will_retain(_Packet, _PState) -> ok. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index fee2210c9..201179af6 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -877,12 +877,14 @@ process_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, State = #sta true -> process_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, State); false -> process_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, State) end; -process_subopts([{rap, _Rap}|Opts], Msg = #message{flags = Flags, headers = #{retained := true}}, State = #state{}) -> - process_subopts(Opts, Msg#message{flags = maps:put(retain, true, Flags)}, State); -process_subopts([{rap, 0}|Opts], Msg = #message{flags = Flags}, State = #state{}) -> - process_subopts(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, State); -process_subopts([{rap, _}|Opts], Msg, State) -> - process_subopts(Opts, Msg, State); +process_subopts([{rap, 0}|Opts], Msg = #message{flags = Flags, headers = #{proto_ver := ?MQTT_PROTO_V5}}, Session) -> + process_subopts(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, Session); +process_subopts([{rap, _}|Opts], Msg = #message{headers = #{proto_ver := ?MQTT_PROTO_V5}}, Session) -> + process_subopts(Opts, Msg, Session); +process_subopts([{rap, _}|Opts], Msg = #message{headers = #{retained := true}}, Session = #session{}) -> + process_subopts(Opts, Msg, Session); +process_subopts([{rap, _}|Opts], Msg = #message{flags = Flags}, Session) -> + process_subopts(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, Session); process_subopts([{subid, SubId}|Opts], Msg, State) -> process_subopts(Opts, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State).