Untitled

 avatar
unknown
python
2 years ago
15 kB
8
Indexable
#
# INTEL CONFIDENTIAL
#
# Copyright (C) 2022 Intel Corporation
#
# This software and the related documents are Intel copyrighted materials, and
# your use of them is governed by the express license under which they were
# provided to you (License). Unless the License provides otherwise, you may not
# use, modify, copy, publish, distribute, disclose or transmit this software or
# the related documents without Intel's prior written permission.
#
# This software and the related documents are provided as is, with no express or
# implied warranties, other than those that are expressly stated in the License.
#
from base.pytm_base_test import PytmBaseTest
from libs.wifi.utils.decorators import ap_running, sta_connected, ip_connectivity, ap_mld_link, legacy_air_sniffer
from tests.wifi.kpis.performance.common.ap.soft_ap_config import ht_20_capab, ht_40_capab
from libs.common.utils.event_waiters.ewait_base import KernelWaiter
from libs.wifi.utils.packet_analyzer import packet_analyzer
from dpkt.ieee80211 import IE_EXTENSION
from libs.common.utils.iperf_session.single import IperfSession
from libs.common.utils.ping.ping import Ping
import re
import slash
import time
import json

EID_EXT_EHT_CAPA = 108
EID_EXT_EHT_MULTI_LINK = 107


class LinuxWiFi7Tests(PytmBaseTest):
    def _validate_ml_element(self, ie, mld_addr, link2_addr):
        slash.logger.info(f'ie={str(ie.info)}')
        slash.logger.info(type(ie.info))
        control = [0x80, 0x01]
        pos = 0

        for i in range(0, 2):
            assert control[i] == ie.info[i], \
                f'Multi Link control mismatch in byte {i}: expected {control[i]}, got {ie.info[i]}'
        
        # control and common info length
        pos += 3 

        mld_addr_hex = [int(byte, 16) for byte in mld_addr.split(':')]
        slash.logger.info(f'mld_addr_hex={mld_addr_hex}')
        for i in range(0, 6):
            assert mld_addr_hex[i] == ie.info[pos + i], \
                f'Multi Link mac address mismatch: expected {mld_addr_hex[i]}, got {ie.info[i]}'        
        #mac address
        pos += 6


        eml_capab = [0x33, 0x00]
        for i in range(0, 2):
            assert eml_capab[i] == ie.info[pos + i], \
                f'Multi Link capabilities mismatch: expected {eml_capab[i]}, got {ie.info[pos+i]}'
        # eml_capab
        pos += 2

        assert ie.info[pos] & 0xf == 0x0, \
            f'Multi Link capabilities and operations mismatch: expected 0x00, got {ie.info[pos]}'
        # eml_cpab_n_oper
        pos += 2

        # todo: verify that the subelement exists, and that thre is only one.
        assert ie.info[pos] == 0x00, \
            f'Multi Link: wrong subelement ID: expected 0x00, got {ie.info[pos]}'
        # Subelement ID and Subelemnt length
        pos += 2

        sta_control_start = ie.info[pos]
        assert sta_control_start & 0xf == 2, \
            f'Multi Link STA control: wrong link ID. Expected 2, got {sta_control_start & 0xf}'
        # Sta Control and Length of Sta Info
        pos += 3

        link2_addr_hex = [int(byte, 16) for byte in link2_addr.split(':')]
        slash.logger.info(f'link2_addr_hex={link2_addr_hex}')
        for i in range(0, 6):
            assert link2_addr_hex[i] == ie.info[pos + i], \
                f'Multi Link STA Info: wrong link addres. Expected {link2_addr_hex[i]}, got {ie.info[pos + i]}'

    def _verify_eml_notif(self, link, snif, mode, links_bitmap):
        slash.logger.info('Check if EML notification is valid')

        frame_type = '(wlan.fc.type_subtype == 0xd)'
        ta = f'(wlan.ta == {link.addr})'
        ra = f'(wlan.ra == {link.bssid})'
        filter = f'{frame_type} && {ta} && {ra}'
        found_eml = False

        for timestamp, rawdata in snif.get_packets(filter):    
            tap, packet = snif.parse_wifi_frame(rawdata)

            if hasattr(packet, "action"):
                if packet.action.category != 37 or packet.action.code != 6:
                    continue
                
                found_eml = True
                eml_control = packet.mgmt.data[3]
                emlsr_mode = eml_control & 1
                assert emlsr_mode == mode, \
                    f'Wrong emlsr mode. Expected {mode}, got {emlsr_mode}'

                links_msk = eml_control & 0xfc
                links = links_msk >> 2
                assert links == links_bitmap, \
                    f'Wrong link bitamp. Expected {links_bitmap}, got {links}'
        assert found_eml, 'EML Notification not found'

    def _get_links_info(self):
        sta = self.resources.sta
        links = sta.wifi.station.wpa.get_links_status()
        for lnk in links:
            if lnk.link_id == '5':
                hb_link = lnk
            else:
                lb_link = lnk
        return hb_link, lb_link

    """
    def _get_frame_types_filters(bssid, link_addr):
        return { \
            'MU-RTS': \
            lambda pkt: pkt.type == C_CTL_TYPE and pkt.subtype == C_TRIGGER and \
                binascii.hexlify(pkt.trigger.src).decode() == bssid and \
                binascii.hexlify(pkt.trigger.dst).decode() == "ffffffffffff", \
            'CTS': \
            lambda pkt: pkt.type == CTL_TYPE and pkt.subtype == C_CTS and \
                binascii.hexlify(pkt.trigger.dst).decode() == bssid, \
            'Ping': \
            lambda pkt: pkt.type == DATA_TYPE and pkt.subtype == D_QOS_DATA and \
                binascii.hexlify(pkt.trigger.src).decode() == bssid and \
                binascii.hexlify(pkt.trigger.dst).decode() == link_addr, \
            'ACK': \
            lambda pkt: packet.type == CTL_TYPE and packet.subtype == C_ACK and \
                binascii.hexlify(packet.trigger.dst).decode() == bssid \
            }
    """

    def _verify_frames_order(self, found_dict, frame):
        for framename, found in found_dict:
            if framename == frame:
                break
            assert found, f'Found {frame} without {framename}'

    @ap_running("ap", channel=1, hw_mode='g',
                country_code="FI",
                ieee80211n='1',
                ieee80211ax='1',
                ieee80211be='1',
                ieee80211d='1',
                ieee80211h='1',
                ht_capab=ht_20_capab,
                mld_ap='1',
                mld_id='0',
                mld_link_id='2')
    @ap_mld_link("ap",
                 channel=36,
                 hw_mode='a',
                 ieee80211ac='1',
                 ieee80211ax='1',
                 ieee80211be='1',
                 ieee80211n='1',
                 ht_capab=ht_40_capab,
                 vht_oper_chwidth=1,
                 vht_oper_centr_freq_seg0_idx=42,
                 he_oper_chwidth=1,
                 he_oper_centr_freq_seg0_idx=42,
                 eht_oper_chwidth=1,
                 eht_oper_centr_freq_seg0_idx=42,
                 mld_ap='1',
                 mld_id='0',
                 mld_link_id='5')
    @ip_connectivity()
    def test_two_links_connection(self):
        sta = self.resources.sta
        ap = self.resources.ap
        snif1 = self.resources.sniffer1.wifi.legacy_sniffer
        snif2 = self.resources.sniffer2.wifi.legacy_sniffer
        sniffers = [snif1, snif2]
        snif1.start_sniffer("link1.pcap", channel=1)
        snif2.start_sniffer("link2.pcap", channel=36)

        # Set HB as prefered Association link
        sta.wifi.station.wpa.set("mld_connect_band_pref", "2")
        sta.wifi.station.wpa.set("p2p_disabled", "1")

        connection_success = sta.wifi.station.connect(ap.wifi.soft_ap)
        assert connection_success, "Station failed to connect to the AP"

        #snif1.stop_sniffer()
        #snif2.stop_sniffer()
        time.sleep(3)

        # Get links info
        (hb_link, lb_link) = self._get_links_info()
        links = [lb_link, hb_link]

        slash.logger.info(f'hb_link = {hb_link}')
        slash.logger.info(f'lb_link = {lb_link}')
        '''   
        # Check that Probe request contains an EHT element      
        slash.logger.info('Checker: verify that the probe request has an EHT Capability')
        frame_type = '(wlan.fc.type == 0 && wlan.fc.subtype == 4)'
        sta_address = sta.wifi.station.wpa.status().address
        ta = f'wlan.ta == {sta_address}'
        found_probe = False
        for packet in snif2.get_packets(f'{frame_type} && {ta}', json_format=True):  
            found_probe = True
            assert packet_analyzer.is_eht_supported(packet), \
                'Probe request does not have EHT Capabilities element'
        assert found_probe, 'Probe request was not found'

        # Check assoc req has ML elem
        slash.logger.info('Checker: verify that assoc request contains an ML element')
        found_assoc_req = False
        frame_type = '(wlan.fc.type_subtype == 0)'
        ta = f'wlan.ta == {hb_link.addr}'
        ra = f'wlan.ra == {hb_link.bssid}'
        for timestamp, rawdata in snif2.get_packets(f'{frame_type} && {ta} && {ra}'):
            found_assoc_req = True
            tap, packet = snif2.parse_wifi_frame(rawdata)
           
            for ie in packet.ies:
                if ie.id == IE_EXTENSION and ie.eid_extension == EID_EXT_EHT_MULTI_LINK:
                    self._validate_ml_element(ie, sta_address, lb_link.addr)
                    break
            else:
                assert False, 'Assoc req does not contain a ML element'
        assert found_assoc_req, 'Assoc Request was not found'
        '''
        # Open BA session by sending a large anount of data
        #ping = Ping(ap, sta.wifi.ipaddr, slash.logger)
        #ping.run(count=20, timeout=60, ping_loss_threshold=100)

        slash.logger.info("Running Tx traffic")
        with IperfSession(ap, sta, sta.wifi.ipaddr, 5) as iperf_session:
            iperf_session.parameters.send_udp_traffic = True
            iperf_session.parameters.port = 5555
            iperf_session.parameters.bandwidth_limit = "10M"

            iperf_session.start()
            iperf_session.wait()
            iperf_session.print_output()
            results = iperf_session.get_results()
            assert results.loss_rate <= 80.0, "too high packet loss"

        #snif1.start_sniffer("link1.pcap", channel=1)
        #snif2.start_sniffer("link2.pcap", channel=36)

        slash.logger.info('Wait 3 seconds for the sniffers to start')
        time.sleep(3)

        slash.logger.info("Activate second link to enter eSR mode")
        valid_links_path = sta.wifi.debugfs.get_wifi_debugfs_paths('valid_links', all_paths=True)[0]
        active_links_path = sta.wifi.debugfs.get_wifi_debugfs_paths('active_links', all_paths=True)[0]

        sta.wifi.debugfs.write(active_links_path, '0x24')
        slash.logger.info(f'active links is = {sta.wifi.debugfs.read(active_links_path)}')
        slash.logger.info('Wait 3 seconds for eSR to start')
        time.sleep(3)

        slash.logger.info("Configure AP to send MU-RTS to both links")
        aid = ap.wifi.soft_ap.sta_aid(sta.wifi.station.macaddr_hex)
        slash.logger.info(f'AID = {aid}')
        slash.logger.info("Set HE AID in sniffer and sleep (1 sec)")

        for snif, link in zip(sniffers, links):
            snif.set_he_aid(aid, link.bssid)

        time.sleep(1)
        '''
        for idx, link in enumerate(links):
            slash.logger.info(f'send cmd for {idx}: link_addr= {link.addr} link_bssid = {link.bssid}')
            ap.wifi.soft_ap.ul_periodic_trigger_frame(sta.wifi.station.macaddr_hex, mld=True,
                                                      link_addr=link.addr, link_bssid=link.bssid,
                                                      link_id=idx, periodic_msec=0, trig_type=6)
        '''
        slash.logger.info('Wait 3 seconds for the trigger frame cmd to be sent')
        time.sleep(1)


        #ping.run(count=20, timeout=60, ping_loss_threshold=100)

        assert False

        slash.logger.info('Deactivate 2nd links')
        sta.wifi.debugfs.write(active_links_path, '0x20')
        slash.logger.info('Wait 3 seconds for eSR deactivation')
        time.sleep(3)

        snif1.stop_sniffer()
        snif2.stop_sniffer()
        time.sleep(3)

        # Check that an EML Notifications was sent upon entering eSR
        self._check_eml_notif(hb_link, snif2, 1, 0x24)
        
        # Check that we have sequences of MU-RTS->CTS->PING->ACK on the same link
        frames = ['MU-RTS', 'CTS', 'Ping', 'ACK']

        for link, snif in zip(links, sniffers):
            found = {frame: False for frame in frames}
            bssid = link.bssid.replace(':', '')
            link_addr = link.addr.replace(':', '')
            #filters = self._get_frame_types_filters(bssid, link_addr)

            for timestamp, rawdata in snif.get_packets():
                tap, pkt = snif.parse_wifi_frame(rawdata)
                
                """
                for frame_type in frames:
                    if filters[frame_type](pkt):
                        self._verify_frames_order(found, frame_type)
                        found[frame_type] = True
                        
                        if frame_type == 'ACK':
                            found = {frame: False for frame in frames}
                        
                        break
                """
                
                if pkt.type == CTL_TYPE and pkt.subtype == C_TRIGGER and \
                    binascii.hexlify(pkt.trigger.src).decode() == bssid and \
                    binascii.hexlify(pkt.trigger.dst).decode() == "ffffffffffff":
                        found['MU-RTS'] = True


                elif pkt.type == CTL_TYPE and pkt.subtype == C_CTS and \
                    binascii.hexlify(pkt.trigger.dst).decode() == bssid:
                        self._verify_frames_order(found, 'CTS')
                        found['CTS'] = True
                
                elif pkt.type == DATA_TYPE and pkt.subtype == D_QOS_DATA and \
                    binascii.hexlify(pkt.trigger.src).decode() == bssid and \
                    binascii.hexlify(pkt.trigger.dst).decode() == link_addr:
                        self._verify_frames_order(found, 'Ping')
                        found['Ping'] = True

                elif pkt.type == CTL_TYPE and pkt.subtype == C_ACK and \
                    binascii.hexlify(pkt.trigger.dst).decode() == bssid:
                        self._verify_frames_order(found, 'ACK')
                        found['ACK'] = True
                        found = {frame: False for frame in frames}


        # Check that a EML Notification was sent upon exiting eSR
        self._verify_eml_notif(hb_link, snif2, 0, 0x20)

        sta.wifi.station.disconnect()

        assert False
Editor is loading...