Untitled

 avatar
unknown
python
2 months ago
5.1 kB
2
Indexable
import asyncio
import random
import logging
from enum import Enum

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Test configuration
MAX_CONCURRENT_CALLS = 100
TOTAL_CALLS = 1000
CALL_DURATION_RANGE = (10, 60)  # Random duration between 10 and 60 seconds

class CallState(Enum):
    INITIAL = 0
    INVITED = 1
    ANSWERED = 2
    PLAYING = 3
    WAITING_INPUT = 4
    PROCESSING = 5
    TERMINATED = 6

class MockCall:
    def __init__(self, call_id):
        self.call_id = call_id
        self.state = CallState.INITIAL

    async def invite(self):
        await asyncio.sleep(0.1)  # Simulate network delay
        self.state = CallState.INVITED
        logging.debug(f"Call {self.call_id} invited")

    async def answer(self):
        await asyncio.sleep(0.1)  # Simulate answer delay
        self.state = CallState.ANSWERED
        logging.debug(f"Call {self.call_id} answered")

    async def play(self, audio_file):
        self.state = CallState.PLAYING
        play_duration = random.uniform(1, 5)  # Simulate variable audio length
        await asyncio.sleep(play_duration)
        logging.debug(f"Call {self.call_id} played {audio_file} for {play_duration:.2f} seconds")

    async def wait_for_input(self):
        self.state = CallState.WAITING_INPUT
        input_delay = random.uniform(0.5, 3)  # Simulate user thinking time
        await asyncio.sleep(input_delay)
        dtmf = random.choice("0123456789*#")
        logging.debug(f"Call {self.call_id} received input: {dtmf}")
        return dtmf

    async def process(self):
        self.state = CallState.PROCESSING
        await asyncio.sleep(0.1)  # Simulate processing time
        logging.debug(f"Call {self.call_id} processed input")

    async def hangup(self):
        await asyncio.sleep(0.1)  # Simulate network delay
        self.state = CallState.TERMINATED
        logging.debug(f"Call {self.call_id} terminated")

class EnhancedMockStressTest:
    def __init__(self):
        self.active_calls = set()
        self.total_calls_made = 0
        self.successful_calls = 0
        self.failed_calls = 0
        self.max_concurrent = 0

    async def simulate_ivr_flow(self, call):
        try:
            await call.invite()
            await call.answer()

            # Simulate welcome message
            await call.play("welcome.wav")

            # Main menu loop
            for _ in range(random.randint(1, 3)):  # Simulate 1-3 menu interactions
                await call.play("menu_options.wav")
                user_input = await call.wait_for_input()
                await call.process()

                if user_input == "0":  # Simulate exit condition
                    break

            # Simulate goodbye message
            await call.play("goodbye.wav")

            await call.hangup()
            self.successful_calls += 1

        except Exception as e:
            logging.error(f"Call {call.call_id} failed: {str(e)}")
            self.failed_calls += 1
        finally:
            if call in self.active_calls:
                self.active_calls.remove(call)

    async def make_call(self):
        call = MockCall(f"stress_test_{self.total_calls_made}")
        self.total_calls_made += 1
        self.active_calls.add(call)

        # Update max concurrent calls
        current_concurrent = len(self.active_calls)
        if current_concurrent > self.max_concurrent:
            self.max_concurrent = current_concurrent

        await self.simulate_ivr_flow(call)

    async def run_test(self):
        tasks = []

        async def call_manager():
            while len(tasks) < TOTAL_CALLS:
                if len(self.active_calls) < MAX_CONCURRENT_CALLS:
                    task = asyncio.create_task(self.make_call())
                    tasks.append(task)
                await asyncio.sleep(0.1)

        async def log_status():
            while tasks or self.active_calls:
                logging.info(f"Active calls: {len(self.active_calls)}, "
                             f"Total calls made: {self.total_calls_made}, "
                             f"Successful: {self.successful_calls}, "
                             f"Failed: {self.failed_calls}, "
                             f"Max concurrent: {self.max_concurrent}")
                await asyncio.sleep(5)

        await asyncio.gather(
            call_manager(),
            log_status()
        )

        # Wait for all calls to finish
        await asyncio.gather(*tasks)

        logging.info("Test completed.")
        logging.info(f"Total calls made: {self.total_calls_made}")
        logging.info(f"Successful calls: {self.successful_calls}")
        logging.info(f"Failed calls: {self.failed_calls}")
        logging.info(f"Max concurrent calls: {self.max_concurrent}")

async def main():
    test = EnhancedMockStressTest()
    await test.run_test()

if __name__ == "__main__":
    asyncio.run(main())
Leave a Comment