From: Piotr Caban Subject: [PATCH v5 05/11] d3dx10: Add ID3DX10ThreadPump:AddWorkItem implementation. Message-Id: Date: Fri, 01 Jul 2022 18:01:06 +0000 In-Reply-To: References: From: Piotr Caban Signed-off-by: Piotr Caban Signed-off-by: Matteo Bruni --- dlls/d3dx10_43/async.c | 248 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 245 insertions(+), 3 deletions(-) diff --git a/dlls/d3dx10_43/async.c b/dlls/d3dx10_43/async.c index 7913f634c21..b22967e13fd 100644 --- a/dlls/d3dx10_43/async.c +++ b/dlls/d3dx10_43/async.c @@ -23,6 +23,7 @@ #include "dxhelpers.h" #include "wine/debug.h" +#include "wine/list.h" WINE_DEFAULT_DEBUG_CHANNEL(d3dx); @@ -610,10 +611,47 @@ HRESULT WINAPI D3DX10PreprocessShaderFromMemory(const char *data, SIZE_T data_si return E_NOTIMPL; } +struct work_item +{ + struct list entry; + + ID3DX10DataLoader *loader; + ID3DX10DataProcessor *processor; + HRESULT *result; + void **object; +}; + +static inline void work_item_free(struct work_item *work_item, BOOL cancel) +{ + ID3DX10DataLoader_Destroy(work_item->loader); + ID3DX10DataProcessor_Destroy(work_item->processor); + if (cancel && work_item->result) + *work_item->result = S_FALSE; + free(work_item); +} + +#define THREAD_PUMP_EXITING UINT_MAX struct thread_pump { ID3DX10ThreadPump ID3DX10ThreadPump_iface; LONG refcount; + + SRWLOCK io_lock; + CONDITION_VARIABLE io_cv; + unsigned int io_count; + struct list io_queue; + + SRWLOCK proc_lock; + CONDITION_VARIABLE proc_cv; + unsigned int proc_count; + struct list proc_queue; + + SRWLOCK device_lock; + unsigned int device_count; + struct list device_queue; + + unsigned int thread_count; + HANDLE threads[1]; }; static inline struct thread_pump *impl_from_ID3DX10ThreadPump(ID3DX10ThreadPump *iface) @@ -652,11 +690,49 @@ static ULONG WINAPI thread_pump_Release(ID3DX10ThreadPump *iface) { struct thread_pump *thread_pump = impl_from_ID3DX10ThreadPump(iface); ULONG refcount = InterlockedDecrement(&thread_pump->refcount); + struct work_item *item, *next; + struct list list; + unsigned int i; TRACE("%p decreasing refcount to %lu.\n", iface, refcount); if (!refcount) + { + AcquireSRWLockExclusive(&thread_pump->io_lock); + thread_pump->io_count = THREAD_PUMP_EXITING; + ReleaseSRWLockExclusive(&thread_pump->io_lock); + WakeAllConditionVariable(&thread_pump->io_cv); + + AcquireSRWLockExclusive(&thread_pump->proc_lock); + thread_pump->proc_count = THREAD_PUMP_EXITING; + ReleaseSRWLockExclusive(&thread_pump->proc_lock); + WakeAllConditionVariable(&thread_pump->proc_cv); + + AcquireSRWLockExclusive(&thread_pump->device_lock); + thread_pump->device_count = THREAD_PUMP_EXITING; + ReleaseSRWLockExclusive(&thread_pump->device_lock); + + for (i = 0; i < thread_pump->thread_count; ++i) + { + if (!thread_pump->threads[i]) + continue; + + WaitForSingleObject(thread_pump->threads[i], INFINITE); + CloseHandle(thread_pump->threads[i]); + } + + list_init(&list); + list_move_tail(&list, &thread_pump->io_queue); + list_move_tail(&list, &thread_pump->proc_queue); + list_move_tail(&list, &thread_pump->device_queue); + LIST_FOR_EACH_ENTRY_SAFE(item, next, &list, struct work_item, entry) + { + list_remove(&item->entry); + work_item_free(item, TRUE); + } + free(thread_pump); + } return refcount; } @@ -664,9 +740,30 @@ static ULONG WINAPI thread_pump_Release(ID3DX10ThreadPump *iface) static HRESULT WINAPI thread_pump_AddWorkItem(ID3DX10ThreadPump *iface, ID3DX10DataLoader *loader, ID3DX10DataProcessor *processor, HRESULT *result, void **object) { - FIXME("iface %p, loader %p, processor %p, result %p, object %p stub!\n", + struct thread_pump *thread_pump = impl_from_ID3DX10ThreadPump(iface); + struct work_item *work_item; + + TRACE("iface %p, loader %p, processor %p, result %p, object %p.\n", iface, loader, processor, result, object); - return E_NOTIMPL; + + work_item = malloc(sizeof(*work_item)); + if (!work_item) + return E_OUTOFMEMORY; + + work_item->loader = loader; + work_item->processor = processor; + work_item->result = result; + work_item->object = object; + + if (object) + *object = NULL; + + AcquireSRWLockExclusive(&thread_pump->io_lock); + ++thread_pump->io_count; + list_add_tail(&thread_pump->io_queue, &work_item->entry); + ReleaseSRWLockExclusive(&thread_pump->io_lock); + WakeConditionVariable(&thread_pump->io_cv); + return S_OK; } static UINT WINAPI thread_pump_GetWorkItemCount(ID3DX10ThreadPump *iface) @@ -714,20 +811,165 @@ static const ID3DX10ThreadPumpVtbl thread_pump_vtbl = thread_pump_GetQueueStatus }; +static DWORD WINAPI io_thread(void *arg) +{ + struct thread_pump *thread_pump = arg; + struct work_item *work_item; + HRESULT hr; + + TRACE("%p thread started.\n", thread_pump); + + for (;;) + { + AcquireSRWLockExclusive(&thread_pump->io_lock); + + while (!thread_pump->io_count) + SleepConditionVariableSRW(&thread_pump->io_cv, &thread_pump->io_lock, INFINITE, 0); + + if (thread_pump->io_count == THREAD_PUMP_EXITING) + { + ReleaseSRWLockExclusive(&thread_pump->io_lock); + return 0; + } + + --thread_pump->io_count; + work_item = LIST_ENTRY(list_head(&thread_pump->io_queue), struct work_item, entry); + list_remove(&work_item->entry); + ReleaseSRWLockExclusive(&thread_pump->io_lock); + + if (FAILED(hr = ID3DX10DataLoader_Load(work_item->loader))) + { + if (work_item->result) + *work_item->result = hr; + work_item_free(work_item, FALSE); + continue; + } + + AcquireSRWLockExclusive(&thread_pump->proc_lock); + if (thread_pump->proc_count == THREAD_PUMP_EXITING) + { + ReleaseSRWLockExclusive(&thread_pump->proc_lock); + work_item_free(work_item, TRUE); + return 0; + } + + list_add_tail(&thread_pump->proc_queue, &work_item->entry); + ++thread_pump->proc_count; + ReleaseSRWLockExclusive(&thread_pump->proc_lock); + WakeConditionVariable(&thread_pump->proc_cv); + } + return 0; +} + +static DWORD WINAPI proc_thread(void *arg) +{ + struct thread_pump *thread_pump = arg; + struct work_item *work_item; + SIZE_T size; + void *data; + HRESULT hr; + + TRACE("%p thread started.\n", thread_pump); + + for (;;) + { + AcquireSRWLockExclusive(&thread_pump->proc_lock); + + while (!thread_pump->proc_count) + SleepConditionVariableSRW(&thread_pump->proc_cv, &thread_pump->proc_lock, INFINITE, 0); + + if (thread_pump->proc_count == THREAD_PUMP_EXITING) + { + ReleaseSRWLockExclusive(&thread_pump->proc_lock); + return 0; + } + + --thread_pump->proc_count; + work_item = LIST_ENTRY(list_head(&thread_pump->proc_queue), struct work_item, entry); + list_remove(&work_item->entry); + ReleaseSRWLockExclusive(&thread_pump->proc_lock); + + if (FAILED(hr = ID3DX10DataLoader_Decompress(work_item->loader, &data, &size))) + { + if (work_item->result) + *work_item->result = hr; + work_item_free(work_item, FALSE); + continue; + } + + if (thread_pump->device_count == THREAD_PUMP_EXITING) + { + work_item_free(work_item, TRUE); + return 0; + } + + if (FAILED(hr = ID3DX10DataProcessor_Process(work_item->processor, data, size))) + { + if (work_item->result) + *work_item->result = hr; + work_item_free(work_item, FALSE); + continue; + } + + AcquireSRWLockExclusive(&thread_pump->device_lock); + if (thread_pump->device_count == THREAD_PUMP_EXITING) + { + ReleaseSRWLockExclusive(&thread_pump->device_lock); + work_item_free(work_item, TRUE); + return 0; + } + + list_add_tail(&thread_pump->device_queue, &work_item->entry); + ++thread_pump->device_count; + ReleaseSRWLockExclusive(&thread_pump->device_lock); + } + return 0; +} + HRESULT WINAPI D3DX10CreateThreadPump(UINT io_threads, UINT proc_threads, ID3DX10ThreadPump **pump) { struct thread_pump *object; + unsigned int i; TRACE("io_threads %u, proc_threads %u, pump %p.\n", io_threads, proc_threads, pump); if (io_threads >= 1024 || proc_threads >= 1024) return E_FAIL; - if (!(object = calloc(1, sizeof(*object)))) + if (!io_threads) + io_threads = 1; + if (!proc_threads) + { + SYSTEM_INFO info; + + GetSystemInfo(&info); + proc_threads = info.dwNumberOfProcessors; + } + + if (!(object = calloc(1, FIELD_OFFSET(struct thread_pump, threads[io_threads + proc_threads])))) return E_OUTOFMEMORY; object->ID3DX10ThreadPump_iface.lpVtbl = &thread_pump_vtbl; object->refcount = 1; + InitializeSRWLock(&object->io_lock); + InitializeConditionVariable(&object->io_cv); + list_init(&object->io_queue); + InitializeSRWLock(&object->proc_lock); + InitializeConditionVariable(&object->proc_cv); + list_init(&object->proc_queue); + InitializeSRWLock(&object->device_lock); + list_init(&object->device_queue); + object->thread_count = io_threads + proc_threads; + + for (i = 0; i < object->thread_count; ++i) + { + object->threads[i] = CreateThread(NULL, 0, i < io_threads ? io_thread : proc_thread, object, 0, NULL); + if (!object->threads[i]) + { + ID3DX10ThreadPump_Release(&object->ID3DX10ThreadPump_iface); + return E_FAIL; + } + } *pump = &object->ID3DX10ThreadPump_iface; return S_OK; -- GitLab https://gitlab.winehq.org/wine/wine/-/merge_requests/272