Untitled
unknown
python
2 years ago
15 kB
8
Indexable
# # INTEL CONFIDENTIAL # # Copyright (C) 2022 Intel Corporation # # This software and the related documents are Intel copyrighted materials, and # your use of them is governed by the express license under which they were # provided to you (License). Unless the License provides otherwise, you may not # use, modify, copy, publish, distribute, disclose or transmit this software or # the related documents without Intel's prior written permission. # # This software and the related documents are provided as is, with no express or # implied warranties, other than those that are expressly stated in the License. # from base.pytm_base_test import PytmBaseTest from libs.wifi.utils.decorators import ap_running, sta_connected, ip_connectivity, ap_mld_link, legacy_air_sniffer from tests.wifi.kpis.performance.common.ap.soft_ap_config import ht_20_capab, ht_40_capab from libs.common.utils.event_waiters.ewait_base import KernelWaiter from libs.wifi.utils.packet_analyzer import packet_analyzer from dpkt.ieee80211 import IE_EXTENSION from libs.common.utils.iperf_session.single import IperfSession from libs.common.utils.ping.ping import Ping import re import slash import time import json EID_EXT_EHT_CAPA = 108 EID_EXT_EHT_MULTI_LINK = 107 class LinuxWiFi7Tests(PytmBaseTest): def _validate_ml_element(self, ie, mld_addr, link2_addr): slash.logger.info(f'ie={str(ie.info)}') slash.logger.info(type(ie.info)) control = [0x80, 0x01] pos = 0 for i in range(0, 2): assert control[i] == ie.info[i], \ f'Multi Link control mismatch in byte {i}: expected {control[i]}, got {ie.info[i]}' # control and common info length pos += 3 mld_addr_hex = [int(byte, 16) for byte in mld_addr.split(':')] slash.logger.info(f'mld_addr_hex={mld_addr_hex}') for i in range(0, 6): assert mld_addr_hex[i] == ie.info[pos + i], \ f'Multi Link mac address mismatch: expected {mld_addr_hex[i]}, got {ie.info[i]}' #mac address pos += 6 eml_capab = [0x33, 0x00] for i in range(0, 2): assert eml_capab[i] == ie.info[pos + i], \ f'Multi Link capabilities mismatch: expected {eml_capab[i]}, got {ie.info[pos+i]}' # eml_capab pos += 2 assert ie.info[pos] & 0xf == 0x0, \ f'Multi Link capabilities and operations mismatch: expected 0x00, got {ie.info[pos]}' # eml_cpab_n_oper pos += 2 # todo: verify that the subelement exists, and that thre is only one. assert ie.info[pos] == 0x00, \ f'Multi Link: wrong subelement ID: expected 0x00, got {ie.info[pos]}' # Subelement ID and Subelemnt length pos += 2 sta_control_start = ie.info[pos] assert sta_control_start & 0xf == 2, \ f'Multi Link STA control: wrong link ID. Expected 2, got {sta_control_start & 0xf}' # Sta Control and Length of Sta Info pos += 3 link2_addr_hex = [int(byte, 16) for byte in link2_addr.split(':')] slash.logger.info(f'link2_addr_hex={link2_addr_hex}') for i in range(0, 6): assert link2_addr_hex[i] == ie.info[pos + i], \ f'Multi Link STA Info: wrong link addres. Expected {link2_addr_hex[i]}, got {ie.info[pos + i]}' def _verify_eml_notif(self, link, snif, mode, links_bitmap): slash.logger.info('Check if EML notification is valid') frame_type = '(wlan.fc.type_subtype == 0xd)' ta = f'(wlan.ta == {link.addr})' ra = f'(wlan.ra == {link.bssid})' filter = f'{frame_type} && {ta} && {ra}' found_eml = False for timestamp, rawdata in snif.get_packets(filter): tap, packet = snif.parse_wifi_frame(rawdata) if hasattr(packet, "action"): if packet.action.category != 37 or packet.action.code != 6: continue found_eml = True eml_control = packet.mgmt.data[3] emlsr_mode = eml_control & 1 assert emlsr_mode == mode, \ f'Wrong emlsr mode. Expected {mode}, got {emlsr_mode}' links_msk = eml_control & 0xfc links = links_msk >> 2 assert links == links_bitmap, \ f'Wrong link bitamp. Expected {links_bitmap}, got {links}' assert found_eml, 'EML Notification not found' def _get_links_info(self): sta = self.resources.sta links = sta.wifi.station.wpa.get_links_status() for lnk in links: if lnk.link_id == '5': hb_link = lnk else: lb_link = lnk return hb_link, lb_link """ def _get_frame_types_filters(bssid, link_addr): return { \ 'MU-RTS': \ lambda pkt: pkt.type == C_CTL_TYPE and pkt.subtype == C_TRIGGER and \ binascii.hexlify(pkt.trigger.src).decode() == bssid and \ binascii.hexlify(pkt.trigger.dst).decode() == "ffffffffffff", \ 'CTS': \ lambda pkt: pkt.type == CTL_TYPE and pkt.subtype == C_CTS and \ binascii.hexlify(pkt.trigger.dst).decode() == bssid, \ 'Ping': \ lambda pkt: pkt.type == DATA_TYPE and pkt.subtype == D_QOS_DATA and \ binascii.hexlify(pkt.trigger.src).decode() == bssid and \ binascii.hexlify(pkt.trigger.dst).decode() == link_addr, \ 'ACK': \ lambda pkt: packet.type == CTL_TYPE and packet.subtype == C_ACK and \ binascii.hexlify(packet.trigger.dst).decode() == bssid \ } """ def _verify_frames_order(self, found_dict, frame): for framename, found in found_dict: if framename == frame: break assert found, f'Found {frame} without {framename}' @ap_running("ap", channel=1, hw_mode='g', country_code="FI", ieee80211n='1', ieee80211ax='1', ieee80211be='1', ieee80211d='1', ieee80211h='1', ht_capab=ht_20_capab, mld_ap='1', mld_id='0', mld_link_id='2') @ap_mld_link("ap", channel=36, hw_mode='a', ieee80211ac='1', ieee80211ax='1', ieee80211be='1', ieee80211n='1', ht_capab=ht_40_capab, vht_oper_chwidth=1, vht_oper_centr_freq_seg0_idx=42, he_oper_chwidth=1, he_oper_centr_freq_seg0_idx=42, eht_oper_chwidth=1, eht_oper_centr_freq_seg0_idx=42, mld_ap='1', mld_id='0', mld_link_id='5') @ip_connectivity() def test_two_links_connection(self): sta = self.resources.sta ap = self.resources.ap snif1 = self.resources.sniffer1.wifi.legacy_sniffer snif2 = self.resources.sniffer2.wifi.legacy_sniffer sniffers = [snif1, snif2] snif1.start_sniffer("link1.pcap", channel=1) snif2.start_sniffer("link2.pcap", channel=36) # Set HB as prefered Association link sta.wifi.station.wpa.set("mld_connect_band_pref", "2") sta.wifi.station.wpa.set("p2p_disabled", "1") connection_success = sta.wifi.station.connect(ap.wifi.soft_ap) assert connection_success, "Station failed to connect to the AP" #snif1.stop_sniffer() #snif2.stop_sniffer() time.sleep(3) # Get links info (hb_link, lb_link) = self._get_links_info() links = [lb_link, hb_link] slash.logger.info(f'hb_link = {hb_link}') slash.logger.info(f'lb_link = {lb_link}') ''' # Check that Probe request contains an EHT element slash.logger.info('Checker: verify that the probe request has an EHT Capability') frame_type = '(wlan.fc.type == 0 && wlan.fc.subtype == 4)' sta_address = sta.wifi.station.wpa.status().address ta = f'wlan.ta == {sta_address}' found_probe = False for packet in snif2.get_packets(f'{frame_type} && {ta}', json_format=True): found_probe = True assert packet_analyzer.is_eht_supported(packet), \ 'Probe request does not have EHT Capabilities element' assert found_probe, 'Probe request was not found' # Check assoc req has ML elem slash.logger.info('Checker: verify that assoc request contains an ML element') found_assoc_req = False frame_type = '(wlan.fc.type_subtype == 0)' ta = f'wlan.ta == {hb_link.addr}' ra = f'wlan.ra == {hb_link.bssid}' for timestamp, rawdata in snif2.get_packets(f'{frame_type} && {ta} && {ra}'): found_assoc_req = True tap, packet = snif2.parse_wifi_frame(rawdata) for ie in packet.ies: if ie.id == IE_EXTENSION and ie.eid_extension == EID_EXT_EHT_MULTI_LINK: self._validate_ml_element(ie, sta_address, lb_link.addr) break else: assert False, 'Assoc req does not contain a ML element' assert found_assoc_req, 'Assoc Request was not found' ''' # Open BA session by sending a large anount of data #ping = Ping(ap, sta.wifi.ipaddr, slash.logger) #ping.run(count=20, timeout=60, ping_loss_threshold=100) slash.logger.info("Running Tx traffic") with IperfSession(ap, sta, sta.wifi.ipaddr, 5) as iperf_session: iperf_session.parameters.send_udp_traffic = True iperf_session.parameters.port = 5555 iperf_session.parameters.bandwidth_limit = "10M" iperf_session.start() iperf_session.wait() iperf_session.print_output() results = iperf_session.get_results() assert results.loss_rate <= 80.0, "too high packet loss" #snif1.start_sniffer("link1.pcap", channel=1) #snif2.start_sniffer("link2.pcap", channel=36) slash.logger.info('Wait 3 seconds for the sniffers to start') time.sleep(3) slash.logger.info("Activate second link to enter eSR mode") valid_links_path = sta.wifi.debugfs.get_wifi_debugfs_paths('valid_links', all_paths=True)[0] active_links_path = sta.wifi.debugfs.get_wifi_debugfs_paths('active_links', all_paths=True)[0] sta.wifi.debugfs.write(active_links_path, '0x24') slash.logger.info(f'active links is = {sta.wifi.debugfs.read(active_links_path)}') slash.logger.info('Wait 3 seconds for eSR to start') time.sleep(3) slash.logger.info("Configure AP to send MU-RTS to both links") aid = ap.wifi.soft_ap.sta_aid(sta.wifi.station.macaddr_hex) slash.logger.info(f'AID = {aid}') slash.logger.info("Set HE AID in sniffer and sleep (1 sec)") for snif, link in zip(sniffers, links): snif.set_he_aid(aid, link.bssid) time.sleep(1) ''' for idx, link in enumerate(links): slash.logger.info(f'send cmd for {idx}: link_addr= {link.addr} link_bssid = {link.bssid}') ap.wifi.soft_ap.ul_periodic_trigger_frame(sta.wifi.station.macaddr_hex, mld=True, link_addr=link.addr, link_bssid=link.bssid, link_id=idx, periodic_msec=0, trig_type=6) ''' slash.logger.info('Wait 3 seconds for the trigger frame cmd to be sent') time.sleep(1) #ping.run(count=20, timeout=60, ping_loss_threshold=100) assert False slash.logger.info('Deactivate 2nd links') sta.wifi.debugfs.write(active_links_path, '0x20') slash.logger.info('Wait 3 seconds for eSR deactivation') time.sleep(3) snif1.stop_sniffer() snif2.stop_sniffer() time.sleep(3) # Check that an EML Notifications was sent upon entering eSR self._check_eml_notif(hb_link, snif2, 1, 0x24) # Check that we have sequences of MU-RTS->CTS->PING->ACK on the same link frames = ['MU-RTS', 'CTS', 'Ping', 'ACK'] for link, snif in zip(links, sniffers): found = {frame: False for frame in frames} bssid = link.bssid.replace(':', '') link_addr = link.addr.replace(':', '') #filters = self._get_frame_types_filters(bssid, link_addr) for timestamp, rawdata in snif.get_packets(): tap, pkt = snif.parse_wifi_frame(rawdata) """ for frame_type in frames: if filters[frame_type](pkt): self._verify_frames_order(found, frame_type) found[frame_type] = True if frame_type == 'ACK': found = {frame: False for frame in frames} break """ if pkt.type == CTL_TYPE and pkt.subtype == C_TRIGGER and \ binascii.hexlify(pkt.trigger.src).decode() == bssid and \ binascii.hexlify(pkt.trigger.dst).decode() == "ffffffffffff": found['MU-RTS'] = True elif pkt.type == CTL_TYPE and pkt.subtype == C_CTS and \ binascii.hexlify(pkt.trigger.dst).decode() == bssid: self._verify_frames_order(found, 'CTS') found['CTS'] = True elif pkt.type == DATA_TYPE and pkt.subtype == D_QOS_DATA and \ binascii.hexlify(pkt.trigger.src).decode() == bssid and \ binascii.hexlify(pkt.trigger.dst).decode() == link_addr: self._verify_frames_order(found, 'Ping') found['Ping'] = True elif pkt.type == CTL_TYPE and pkt.subtype == C_ACK and \ binascii.hexlify(pkt.trigger.dst).decode() == bssid: self._verify_frames_order(found, 'ACK') found['ACK'] = True found = {frame: False for frame in frames} # Check that a EML Notification was sent upon exiting eSR self._verify_eml_notif(hb_link, snif2, 0, 0x20) sta.wifi.station.disconnect() assert False
Editor is loading...