rumqttc/v5/mqttbytes/v5/
unsubscribe.rs

1use super::*;
2use bytes::{Buf, Bytes};
3
4/// Unsubscribe packet
5#[derive(Debug, Clone, PartialEq, Eq, Default)]
6pub struct Unsubscribe {
7    pub pkid: u16,
8    pub filters: Vec<String>,
9    pub properties: Option<UnsubscribeProperties>,
10}
11
12impl Unsubscribe {
13    pub fn new<S: Into<String>>(filter: S, properties: Option<UnsubscribeProperties>) -> Self {
14        Self {
15            filters: vec![filter.into()],
16            properties,
17            ..Default::default()
18        }
19    }
20
21    pub fn size(&self) -> usize {
22        let len = self.len();
23        let remaining_len_size = len_len(len);
24
25        1 + remaining_len_size + len
26    }
27
28    fn len(&self) -> usize {
29        // Packet id + length of filters (unlike subscribe, this just a string.
30        // Hence 2 is prefixed for len per filter)
31        let mut len = 2 + self.filters.iter().fold(0, |s, t| 2 + s + t.len());
32
33        if let Some(p) = &self.properties {
34            let properties_len = p.len();
35            let properties_len_len = len_len(properties_len);
36            len += properties_len_len + properties_len;
37        } else {
38            // just 1 byte representing 0 len
39            len += 1;
40        }
41
42        len
43    }
44
45    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Unsubscribe, Error> {
46        let variable_header_index = fixed_header.fixed_header_len;
47        bytes.advance(variable_header_index);
48
49        let pkid = read_u16(&mut bytes)?;
50        let properties = UnsubscribeProperties::read(&mut bytes)?;
51
52        let mut filters = Vec::with_capacity(1);
53        while bytes.has_remaining() {
54            let filter = read_mqtt_string(&mut bytes)?;
55            filters.push(filter);
56        }
57
58        let unsubscribe = Unsubscribe {
59            pkid,
60            filters,
61            properties,
62        };
63        Ok(unsubscribe)
64    }
65
66    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
67        buffer.put_u8(0xA2);
68
69        // write remaining length
70        let remaining_len = self.len();
71        let remaining_len_bytes = write_remaining_length(buffer, remaining_len)?;
72
73        // write packet id
74        buffer.put_u16(self.pkid);
75
76        if let Some(p) = &self.properties {
77            p.write(buffer)?;
78        } else {
79            write_remaining_length(buffer, 0)?;
80        }
81
82        // write filters
83        for filter in self.filters.iter() {
84            write_mqtt_string(buffer, filter);
85        }
86
87        Ok(1 + remaining_len_bytes + remaining_len)
88    }
89}
90
91#[derive(Debug, Clone, PartialEq, Eq)]
92pub struct UnsubscribeProperties {
93    pub user_properties: Vec<(String, String)>,
94}
95
96impl UnsubscribeProperties {
97    fn len(&self) -> usize {
98        let mut len = 0;
99
100        for (key, value) in self.user_properties.iter() {
101            len += 1 + 2 + key.len() + 2 + value.len();
102        }
103
104        len
105    }
106
107    pub fn read(bytes: &mut Bytes) -> Result<Option<UnsubscribeProperties>, Error> {
108        let mut user_properties = Vec::new();
109
110        let (properties_len_len, properties_len) = length(bytes.iter())?;
111        bytes.advance(properties_len_len);
112
113        if properties_len == 0 {
114            return Ok(None);
115        }
116
117        let mut cursor = 0;
118        // read until cursor reaches property length. properties_len = 0 will skip this loop
119        while cursor < properties_len {
120            let prop = read_u8(bytes)?;
121            cursor += 1;
122
123            match property(prop)? {
124                PropertyType::UserProperty => {
125                    let key = read_mqtt_string(bytes)?;
126                    let value = read_mqtt_string(bytes)?;
127                    cursor += 2 + key.len() + 2 + value.len();
128                    user_properties.push((key, value));
129                }
130                _ => return Err(Error::InvalidPropertyType(prop)),
131            }
132        }
133
134        Ok(Some(UnsubscribeProperties { user_properties }))
135    }
136
137    pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
138        let len = self.len();
139        write_remaining_length(buffer, len)?;
140
141        for (key, value) in self.user_properties.iter() {
142            buffer.put_u8(PropertyType::UserProperty as u8);
143            write_mqtt_string(buffer, key);
144            write_mqtt_string(buffer, value);
145        }
146
147        Ok(())
148    }
149}
150
151#[cfg(test)]
152mod test {
153    use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
154    use super::*;
155    use bytes::BytesMut;
156    use pretty_assertions::assert_eq;
157
158    #[test]
159    fn length_calculation() {
160        let mut dummy_bytes = BytesMut::new();
161        // Use user_properties to pad the size to exceed ~128 bytes to make the
162        // remaining_length field in the packet be 2 bytes long.
163        let unsubscribe_props = UnsubscribeProperties {
164            user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
165        };
166
167        let unsubscribe_pkt = Unsubscribe::new("hello/world", Some(unsubscribe_props));
168
169        let size_from_size = unsubscribe_pkt.size();
170        let size_from_write = unsubscribe_pkt.write(&mut dummy_bytes).unwrap();
171        let size_from_bytes = dummy_bytes.len();
172
173        assert_eq!(size_from_write, size_from_bytes);
174        assert_eq!(size_from_size, size_from_bytes);
175    }
176}