Untitled

 avatar
unknown
plain_text
4 months ago
3.5 kB
3
Indexable
import asyncio
import time
import numpy as np
from ctypes.wintypes import HWND
from utils import get_direct3d_device
from winsdk.windows.graphics.capture.interop import create_for_window
from winsdk.windows.graphics.capture import (
    Direct3D11CaptureFramePool,
    GraphicsCaptureSession,
)
from winsdk.windows.graphics.directx import DirectXPixelFormat
from winsdk.windows.graphics.imaging import (
    SoftwareBitmap,
    BitmapBufferAccessMode
)


class Screenshot_WindowsGraphicsCapture:
    def __init__(self, hwnd: HWND) -> None:
        self.device = get_direct3d_device()
        self.item = create_for_window(hwnd)

        # Create frame pool and session once
        self.frame_pool = Direct3D11CaptureFramePool.create_free_threaded(
            self.device,
            DirectXPixelFormat.B8_G8_R8_A8_UINT_NORMALIZED,
            2,  # buffer count > 1 for smoother capture
            self.item.size,
        )
        self.session: GraphicsCaptureSession = self.frame_pool.create_capture_session(self.item)
        self.session.is_border_required = False
        self.session.is_cursor_capture_enabled = False

        # A queue to store incoming frames
        self._frame_queue = asyncio.Queue()

        # Callback for new frames
        def frame_arrived_callback(frame_pool, event_args):
            frame = frame_pool.try_get_next_frame()
            if frame:
                loop = asyncio.get_event_loop()
                loop.call_soon_threadsafe(self._frame_queue.put_nowait, frame)

        self.frame_pool.add_frame_arrived(frame_arrived_callback)

    def start(self):
        self.session.start_capture()

    def stop(self):
        self.session.close()
        self.frame_pool.close()

    async def frames(self):
        """Async generator that yields continuous frames as numpy arrays (BGRA)."""
        self.start()
        try:
            while True:
                frame = await self._frame_queue.get()
                software_bitmap = await SoftwareBitmap.create_copy_from_surface_async(frame.surface)

                buffer = software_bitmap.lock_buffer(BitmapBufferAccessMode.READ_WRITE)
                pixel_bytes = buffer.create_reference()

                # Convert to numpy array (BGRA)
                image = np.frombuffer(pixel_bytes, dtype=np.uint8)
                image = image.reshape((self.item.size.height, self.item.size.width, 4))

                yield image

                # Release resources
                frame.close()
                software_bitmap.close()
        finally:
            self.stop()


async def main():
    # Replace YOUR_HWND with a valid HWND
    YOUR_HWND = ...  # e.g. obtained via win32gui.FindWindow
    
    capturer = Screenshot_WindowsGraphicsCapture(YOUR_HWND)
    frame_count = 0
    REPORT_INTERVAL = 100  # Print performance every 100 frames
    start_time = time.perf_counter()

    async for frame in capturer.frames():
        frame_count += 1
        if frame_count % REPORT_INTERVAL == 0:
            elapsed = time.perf_counter() - start_time
            fps = frame_count / elapsed
            print(f"Captured {frame_count} frames, Avg FPS: {fps:.2f}")

            # You can break out if needed, or let it run indefinitely
            # For example, stop after capturing 1000 frames:
            # if frame_count >= 1000:
            #     break

if __name__ == "__main__":
    asyncio.run(main())
Editor is loading...
Leave a Comment