Untitled
unknown
python
3 years ago
15 kB
9
Indexable
#
# INTEL CONFIDENTIAL
#
# Copyright (C) 2022 Intel Corporation
#
# This software and the related documents are Intel copyrighted materials, and
# your use of them is governed by the express license under which they were
# provided to you (License). Unless the License provides otherwise, you may not
# use, modify, copy, publish, distribute, disclose or transmit this software or
# the related documents without Intel's prior written permission.
#
# This software and the related documents are provided as is, with no express or
# implied warranties, other than those that are expressly stated in the License.
#
from base.pytm_base_test import PytmBaseTest
from libs.wifi.utils.decorators import ap_running, sta_connected, ip_connectivity, ap_mld_link, legacy_air_sniffer
from tests.wifi.kpis.performance.common.ap.soft_ap_config import ht_20_capab, ht_40_capab
from libs.common.utils.event_waiters.ewait_base import KernelWaiter
from libs.wifi.utils.packet_analyzer import packet_analyzer
from dpkt.ieee80211 import IE_EXTENSION
from libs.common.utils.iperf_session.single import IperfSession
from libs.common.utils.ping.ping import Ping
import re
import slash
import time
import json
EID_EXT_EHT_CAPA = 108
EID_EXT_EHT_MULTI_LINK = 107
class LinuxWiFi7Tests(PytmBaseTest):
def _validate_ml_element(self, ie, mld_addr, link2_addr):
slash.logger.info(f'ie={str(ie.info)}')
slash.logger.info(type(ie.info))
control = [0x80, 0x01]
pos = 0
for i in range(0, 2):
assert control[i] == ie.info[i], \
f'Multi Link control mismatch in byte {i}: expected {control[i]}, got {ie.info[i]}'
# control and common info length
pos += 3
mld_addr_hex = [int(byte, 16) for byte in mld_addr.split(':')]
slash.logger.info(f'mld_addr_hex={mld_addr_hex}')
for i in range(0, 6):
assert mld_addr_hex[i] == ie.info[pos + i], \
f'Multi Link mac address mismatch: expected {mld_addr_hex[i]}, got {ie.info[i]}'
#mac address
pos += 6
eml_capab = [0x33, 0x00]
for i in range(0, 2):
assert eml_capab[i] == ie.info[pos + i], \
f'Multi Link capabilities mismatch: expected {eml_capab[i]}, got {ie.info[pos+i]}'
# eml_capab
pos += 2
assert ie.info[pos] & 0xf == 0x0, \
f'Multi Link capabilities and operations mismatch: expected 0x00, got {ie.info[pos]}'
# eml_cpab_n_oper
pos += 2
# todo: verify that the subelement exists, and that thre is only one.
assert ie.info[pos] == 0x00, \
f'Multi Link: wrong subelement ID: expected 0x00, got {ie.info[pos]}'
# Subelement ID and Subelemnt length
pos += 2
sta_control_start = ie.info[pos]
assert sta_control_start & 0xf == 2, \
f'Multi Link STA control: wrong link ID. Expected 2, got {sta_control_start & 0xf}'
# Sta Control and Length of Sta Info
pos += 3
link2_addr_hex = [int(byte, 16) for byte in link2_addr.split(':')]
slash.logger.info(f'link2_addr_hex={link2_addr_hex}')
for i in range(0, 6):
assert link2_addr_hex[i] == ie.info[pos + i], \
f'Multi Link STA Info: wrong link addres. Expected {link2_addr_hex[i]}, got {ie.info[pos + i]}'
def _verify_eml_notif(self, link, snif, mode, links_bitmap):
slash.logger.info('Check if EML notification is valid')
frame_type = '(wlan.fc.type_subtype == 0xd)'
ta = f'(wlan.ta == {link.addr})'
ra = f'(wlan.ra == {link.bssid})'
filter = f'{frame_type} && {ta} && {ra}'
found_eml = False
for timestamp, rawdata in snif.get_packets(filter):
tap, packet = snif.parse_wifi_frame(rawdata)
if hasattr(packet, "action"):
if packet.action.category != 37 or packet.action.code != 6:
continue
found_eml = True
eml_control = packet.mgmt.data[3]
emlsr_mode = eml_control & 1
assert emlsr_mode == mode, \
f'Wrong emlsr mode. Expected {mode}, got {emlsr_mode}'
links_msk = eml_control & 0xfc
links = links_msk >> 2
assert links == links_bitmap, \
f'Wrong link bitamp. Expected {links_bitmap}, got {links}'
assert found_eml, 'EML Notification not found'
def _get_links_info(self):
sta = self.resources.sta
links = sta.wifi.station.wpa.get_links_status()
for lnk in links:
if lnk.link_id == '5':
hb_link = lnk
else:
lb_link = lnk
return hb_link, lb_link
"""
def _get_frame_types_filters(bssid, link_addr):
return { \
'MU-RTS': \
lambda pkt: pkt.type == C_CTL_TYPE and pkt.subtype == C_TRIGGER and \
binascii.hexlify(pkt.trigger.src).decode() == bssid and \
binascii.hexlify(pkt.trigger.dst).decode() == "ffffffffffff", \
'CTS': \
lambda pkt: pkt.type == CTL_TYPE and pkt.subtype == C_CTS and \
binascii.hexlify(pkt.trigger.dst).decode() == bssid, \
'Ping': \
lambda pkt: pkt.type == DATA_TYPE and pkt.subtype == D_QOS_DATA and \
binascii.hexlify(pkt.trigger.src).decode() == bssid and \
binascii.hexlify(pkt.trigger.dst).decode() == link_addr, \
'ACK': \
lambda pkt: packet.type == CTL_TYPE and packet.subtype == C_ACK and \
binascii.hexlify(packet.trigger.dst).decode() == bssid \
}
"""
def _verify_frames_order(self, found_dict, frame):
for framename, found in found_dict:
if framename == frame:
break
assert found, f'Found {frame} without {framename}'
@ap_running("ap", channel=1, hw_mode='g',
country_code="FI",
ieee80211n='1',
ieee80211ax='1',
ieee80211be='1',
ieee80211d='1',
ieee80211h='1',
ht_capab=ht_20_capab,
mld_ap='1',
mld_id='0',
mld_link_id='2')
@ap_mld_link("ap",
channel=36,
hw_mode='a',
ieee80211ac='1',
ieee80211ax='1',
ieee80211be='1',
ieee80211n='1',
ht_capab=ht_40_capab,
vht_oper_chwidth=1,
vht_oper_centr_freq_seg0_idx=42,
he_oper_chwidth=1,
he_oper_centr_freq_seg0_idx=42,
eht_oper_chwidth=1,
eht_oper_centr_freq_seg0_idx=42,
mld_ap='1',
mld_id='0',
mld_link_id='5')
@ip_connectivity()
def test_two_links_connection(self):
sta = self.resources.sta
ap = self.resources.ap
snif1 = self.resources.sniffer1.wifi.legacy_sniffer
snif2 = self.resources.sniffer2.wifi.legacy_sniffer
sniffers = [snif1, snif2]
snif1.start_sniffer("link1.pcap", channel=1)
snif2.start_sniffer("link2.pcap", channel=36)
# Set HB as prefered Association link
sta.wifi.station.wpa.set("mld_connect_band_pref", "2")
sta.wifi.station.wpa.set("p2p_disabled", "1")
connection_success = sta.wifi.station.connect(ap.wifi.soft_ap)
assert connection_success, "Station failed to connect to the AP"
#snif1.stop_sniffer()
#snif2.stop_sniffer()
time.sleep(3)
# Get links info
(hb_link, lb_link) = self._get_links_info()
links = [lb_link, hb_link]
slash.logger.info(f'hb_link = {hb_link}')
slash.logger.info(f'lb_link = {lb_link}')
'''
# Check that Probe request contains an EHT element
slash.logger.info('Checker: verify that the probe request has an EHT Capability')
frame_type = '(wlan.fc.type == 0 && wlan.fc.subtype == 4)'
sta_address = sta.wifi.station.wpa.status().address
ta = f'wlan.ta == {sta_address}'
found_probe = False
for packet in snif2.get_packets(f'{frame_type} && {ta}', json_format=True):
found_probe = True
assert packet_analyzer.is_eht_supported(packet), \
'Probe request does not have EHT Capabilities element'
assert found_probe, 'Probe request was not found'
# Check assoc req has ML elem
slash.logger.info('Checker: verify that assoc request contains an ML element')
found_assoc_req = False
frame_type = '(wlan.fc.type_subtype == 0)'
ta = f'wlan.ta == {hb_link.addr}'
ra = f'wlan.ra == {hb_link.bssid}'
for timestamp, rawdata in snif2.get_packets(f'{frame_type} && {ta} && {ra}'):
found_assoc_req = True
tap, packet = snif2.parse_wifi_frame(rawdata)
for ie in packet.ies:
if ie.id == IE_EXTENSION and ie.eid_extension == EID_EXT_EHT_MULTI_LINK:
self._validate_ml_element(ie, sta_address, lb_link.addr)
break
else:
assert False, 'Assoc req does not contain a ML element'
assert found_assoc_req, 'Assoc Request was not found'
'''
# Open BA session by sending a large anount of data
#ping = Ping(ap, sta.wifi.ipaddr, slash.logger)
#ping.run(count=20, timeout=60, ping_loss_threshold=100)
slash.logger.info("Running Tx traffic")
with IperfSession(ap, sta, sta.wifi.ipaddr, 5) as iperf_session:
iperf_session.parameters.send_udp_traffic = True
iperf_session.parameters.port = 5555
iperf_session.parameters.bandwidth_limit = "10M"
iperf_session.start()
iperf_session.wait()
iperf_session.print_output()
results = iperf_session.get_results()
assert results.loss_rate <= 80.0, "too high packet loss"
#snif1.start_sniffer("link1.pcap", channel=1)
#snif2.start_sniffer("link2.pcap", channel=36)
slash.logger.info('Wait 3 seconds for the sniffers to start')
time.sleep(3)
slash.logger.info("Activate second link to enter eSR mode")
valid_links_path = sta.wifi.debugfs.get_wifi_debugfs_paths('valid_links', all_paths=True)[0]
active_links_path = sta.wifi.debugfs.get_wifi_debugfs_paths('active_links', all_paths=True)[0]
sta.wifi.debugfs.write(active_links_path, '0x24')
slash.logger.info(f'active links is = {sta.wifi.debugfs.read(active_links_path)}')
slash.logger.info('Wait 3 seconds for eSR to start')
time.sleep(3)
slash.logger.info("Configure AP to send MU-RTS to both links")
aid = ap.wifi.soft_ap.sta_aid(sta.wifi.station.macaddr_hex)
slash.logger.info(f'AID = {aid}')
slash.logger.info("Set HE AID in sniffer and sleep (1 sec)")
for snif, link in zip(sniffers, links):
snif.set_he_aid(aid, link.bssid)
time.sleep(1)
'''
for idx, link in enumerate(links):
slash.logger.info(f'send cmd for {idx}: link_addr= {link.addr} link_bssid = {link.bssid}')
ap.wifi.soft_ap.ul_periodic_trigger_frame(sta.wifi.station.macaddr_hex, mld=True,
link_addr=link.addr, link_bssid=link.bssid,
link_id=idx, periodic_msec=0, trig_type=6)
'''
slash.logger.info('Wait 3 seconds for the trigger frame cmd to be sent')
time.sleep(1)
#ping.run(count=20, timeout=60, ping_loss_threshold=100)
assert False
slash.logger.info('Deactivate 2nd links')
sta.wifi.debugfs.write(active_links_path, '0x20')
slash.logger.info('Wait 3 seconds for eSR deactivation')
time.sleep(3)
snif1.stop_sniffer()
snif2.stop_sniffer()
time.sleep(3)
# Check that an EML Notifications was sent upon entering eSR
self._check_eml_notif(hb_link, snif2, 1, 0x24)
# Check that we have sequences of MU-RTS->CTS->PING->ACK on the same link
frames = ['MU-RTS', 'CTS', 'Ping', 'ACK']
for link, snif in zip(links, sniffers):
found = {frame: False for frame in frames}
bssid = link.bssid.replace(':', '')
link_addr = link.addr.replace(':', '')
#filters = self._get_frame_types_filters(bssid, link_addr)
for timestamp, rawdata in snif.get_packets():
tap, pkt = snif.parse_wifi_frame(rawdata)
"""
for frame_type in frames:
if filters[frame_type](pkt):
self._verify_frames_order(found, frame_type)
found[frame_type] = True
if frame_type == 'ACK':
found = {frame: False for frame in frames}
break
"""
if pkt.type == CTL_TYPE and pkt.subtype == C_TRIGGER and \
binascii.hexlify(pkt.trigger.src).decode() == bssid and \
binascii.hexlify(pkt.trigger.dst).decode() == "ffffffffffff":
found['MU-RTS'] = True
elif pkt.type == CTL_TYPE and pkt.subtype == C_CTS and \
binascii.hexlify(pkt.trigger.dst).decode() == bssid:
self._verify_frames_order(found, 'CTS')
found['CTS'] = True
elif pkt.type == DATA_TYPE and pkt.subtype == D_QOS_DATA and \
binascii.hexlify(pkt.trigger.src).decode() == bssid and \
binascii.hexlify(pkt.trigger.dst).decode() == link_addr:
self._verify_frames_order(found, 'Ping')
found['Ping'] = True
elif pkt.type == CTL_TYPE and pkt.subtype == C_ACK and \
binascii.hexlify(pkt.trigger.dst).decode() == bssid:
self._verify_frames_order(found, 'ACK')
found['ACK'] = True
found = {frame: False for frame in frames}
# Check that a EML Notification was sent upon exiting eSR
self._verify_eml_notif(hb_link, snif2, 0, 0x20)
sta.wifi.station.disconnect()
assert FalseEditor is loading...