Untitled

 avatar
unknown
plain_text
5 months ago
3.8 kB
3
Indexable
from winsdk.windows.graphics.capture import (
    Direct3D11CaptureFramePool,
    Direct3D11CaptureFrame,
)
import asyncio
import cv2
from winsdk.windows.graphics.capture.interop import create_for_window
from ctypes.wintypes import HWND
from winsdk.windows.graphics.capture import (
    Direct3D11CaptureFramePool,
    Direct3D11CaptureFrame,
)
from winsdk.windows.graphics.directx import DirectXPixelFormat
from winsdk.system import Object
from winsdk.windows.graphics.imaging import (
    SoftwareBitmap,
    BitmapBufferAccessMode,
    BitmapBuffer,
)
import numpy as np
import ctypes
import time
from winsdk.windows.ai.machinelearning import LearningModelDevice, LearningModelDeviceKind
from winsdk.windows.media.capture import MediaCapture


def get_direct3d_device():
    try:
      direct_3d_device = LearningModelDevice(LearningModelDeviceKind.DIRECT_X_HIGH_PERFORMANCE).direct3_d11_device
    except:
      direct_3d_device = None
    if not direct_3d_device:
        media_capture = MediaCapture()
        async def coroutine():
            await (media_capture.initialize_async() or asyncio.sleep(0))
        asyncio.run(coroutine())
        direct_3d_device = media_capture.media_capture_settings and media_capture.media_capture_settings.direct3_d11_device
    if not direct_3d_device:
        raise OSError("Unable to initialize a Direct3D Device.")
    return direct_3d_device


class Screenshot_WindowsGraphicsCapture:
    def __init__(self, hwnd: HWND) -> None:
        self.device = get_direct3d_device()
        self.item = create_for_window(hwnd)
        self.frame_queue = asyncio.Queue()
        self.last_frame_timestamp = time.time()

    async def process_frame(self, frame):
        software_bitmap = await SoftwareBitmap.create_copy_from_surface_async(frame.surface)
        buffer = software_bitmap.lock_buffer(BitmapBufferAccessMode.READ_WRITE)
        image = np.frombuffer(buffer.create_reference(), dtype=np.uint8)
        image.shape = (self.item.size.height, self.item.size.width, 4)
        await self.frame_queue.put(image)
        software_bitmap.close()
        frame.close()
        now = time.time()
        print((now - self.last_frame_timestamp) * 1000, "ms")
        self.last_frame_timestamp = now

    def handle_frame(self, frame_pool, event_args):
        frame = frame_pool.try_get_next_frame()

        # Handle no frame case
        if not frame:
            print("No frame...")
            return

        # Convert frame
        fut = asyncio.run_coroutine_threadsafe(self.process_frame(frame), asyncio.get_running_loop())

    def start(self):

        # Setting up the session
        self.frame_pool = Direct3D11CaptureFramePool.create_free_threaded(
            self.device,
            DirectXPixelFormat.B8_G8_R8_A8_UINT_NORMALIZED,
            32,
            self.item.size,
        )
        self.session = self.frame_pool.create_capture_session(self.item)
        self.session.is_border_required = False
        self.session.is_cursor_capture_enabled = False

        # Creating event loop
        loop = asyncio.get_running_loop()

        # Add calback
        self.frame_pool.add_frame_arrived(lambda fp, o: loop.call_soon_threadsafe(self.handle_frame, fp, o))
        self.session.start_capture()

    async def capture(self):
        self.start()
        while True:
            frame = await self.frame_queue.get()
            yield frame
            self.frame_queue.task_done()


async def main():
    user32 = ctypes.windll.user32
    handle = user32.FindWindowW(None, "League of Legends (TM) Client")
    s = Screenshot_WindowsGraphicsCapture(handle)
    async for frame in s.capture():
        pass


if __name__ == "__main__":
    asyncio.run(main())
Editor is loading...
Leave a Comment