From: Sebastian Lackner Subject: [4/12] ntdll: Implement threadpool timer queues. Message-Id: <5595D025.1070201@fds-team.de> Date: Fri, 3 Jul 2015 01:58:29 +0200 --- dlls/ntdll/ntdll.spec | 3 dlls/ntdll/threadpool.c | 321 +++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 321 insertions(+), 3 deletions(-) diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec index 10d3652..ce9d1bb 100644 --- a/dlls/ntdll/ntdll.spec +++ b/dlls/ntdll/ntdll.spec @@ -981,6 +981,7 @@ @ stdcall TpCallbackSetEventOnCompletion(ptr long) @ stdcall TpCallbackUnloadDllOnCompletion(ptr ptr) @ stdcall TpDisassociateCallback(ptr) +@ stdcall TpIsTimerSet(ptr) @ stdcall TpPostWork(ptr) @ stdcall TpReleaseCleanupGroup(ptr) @ stdcall TpReleaseCleanupGroupMembers(ptr long ptr) @@ -989,7 +990,9 @@ @ stdcall TpReleaseWork(ptr) @ stdcall TpSetPoolMaxThreads(ptr long) @ stdcall TpSetPoolMinThreads(ptr long) +@ stdcall TpSetTimer(ptr ptr long long) @ stdcall TpSimpleTryPost(ptr ptr ptr) +@ stdcall TpWaitForTimer(ptr long) @ stdcall TpWaitForWork(ptr long) @ stdcall -ret64 VerSetConditionMask(int64 long long) @ stdcall WinSqmIsOptedIn() diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c index cf714a6..ef502ba 100644 --- a/dlls/ntdll/threadpool.c +++ b/dlls/ntdll/threadpool.c @@ -77,14 +77,14 @@ static RTL_CRITICAL_SECTION_DEBUG critsect_debug = { 0, 0, &old_threadpool.threadpool_cs, { &critsect_debug.ProcessLocksList, &critsect_debug.ProcessLocksList }, - 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_cs") } + 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_cs") } }; static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug = { 0, 0, &old_threadpool.threadpool_compl_cs, { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList }, - 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") } + 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") } }; struct work_item @@ -200,6 +200,14 @@ struct threadpool_object struct { PTP_TIMER_CALLBACK callback; + /* information about the timer, locked via timerqueue.cs */ + BOOL timer_initialized; + BOOL timer_pending; + struct list timer_entry; + BOOL timer_set; + ULONGLONG timeout; + LONG period; + LONG window_length; } timer; } u; }; @@ -232,6 +240,33 @@ struct threadpool_group struct list members; }; +/* global timerqueue object */ +static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug; + +static struct +{ + CRITICAL_SECTION cs; + LONG objcount; + BOOL thread_running; + struct list pending_timers; + RTL_CONDITION_VARIABLE update_event; +} +timerqueue = +{ + { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */ + 0, /* objcount */ + FALSE, /* thread_running */ + LIST_INIT( timerqueue.pending_timers ), /* pending_timers */ + RTL_CONDITION_VARIABLE_INIT /* update_event */ +}; + +static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug = +{ + 0, 0, &timerqueue.cs, + { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList }, + 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") } +}; + static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool ) { return (struct threadpool *)pool; @@ -1186,6 +1221,171 @@ NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer, } /*********************************************************************** + * timerqueue_thread_proc (internal) + */ +static void CALLBACK timerqueue_thread_proc( void *param ) +{ + ULONGLONG timeout_lower, timeout_upper, new_timeout; + struct threadpool_object *other_timer; + LARGE_INTEGER now, timeout; + struct list *ptr; + + TRACE( "starting timer queue thread\n" ); + + RtlEnterCriticalSection( &timerqueue.cs ); + for (;;) + { + NtQuerySystemTime( &now ); + + /* Check for expired timers. */ + while ((ptr = list_head( &timerqueue.pending_timers ))) + { + struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry ); + assert( timer->type == TP_OBJECT_TYPE_TIMER ); + assert( timer->u.timer.timer_pending ); + if (timer->u.timer.timeout > now.QuadPart) + break; + + /* Queue a new callback in one of the worker threads. */ + list_remove( &timer->u.timer.timer_entry ); + timer->u.timer.timer_pending = FALSE; + tp_object_submit( timer ); + + /* Insert the timer back into the queue, except its marked for shutdown. */ + if (timer->u.timer.period && !timer->shutdown) + { + timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000; + if (timer->u.timer.timeout <= now.QuadPart) + timer->u.timer.timeout = now.QuadPart + 1; + + LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers, + struct threadpool_object, u.timer.timer_entry ) + { + assert( other_timer->type == TP_OBJECT_TYPE_TIMER ); + if (timer->u.timer.timeout < other_timer->u.timer.timeout) + break; + } + list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry ); + timer->u.timer.timer_pending = TRUE; + } + } + + timeout_lower = TIMEOUT_INFINITE; + timeout_upper = TIMEOUT_INFINITE; + + /* Determine next timeout and use the window length to optimize wakeup times. */ + LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers, + struct threadpool_object, u.timer.timer_entry ) + { + assert( other_timer->type == TP_OBJECT_TYPE_TIMER ); + if (other_timer->u.timer.timeout >= timeout_upper) + break; + + timeout_lower = other_timer->u.timer.timeout; + new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000; + if (new_timeout < timeout_upper) + timeout_upper = new_timeout; + } + + /* Wait for timer update events or until the next timer expires. */ + if (timerqueue.objcount) + { + timeout.QuadPart = timeout_lower; + RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout ); + continue; + } + + /* All timers have been destroyed, if no new timers are created + * within some amount of time, then we can shutdown this thread. */ + timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000; + if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, + &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount) + { + break; + } + } + + timerqueue.thread_running = FALSE; + RtlLeaveCriticalSection( &timerqueue.cs ); + + TRACE( "terminating timer queue thread\n" ); +} + +/*********************************************************************** + * tp_timerqueue_lock (internal) + * + * Acquires a lock on the global timerqueue. When the lock is acquired + * successfully, it is guaranteed that the timer thread is running. + */ +static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer ) +{ + NTSTATUS status = STATUS_SUCCESS; + assert( timer->type == TP_OBJECT_TYPE_TIMER ); + + timer->u.timer.timer_initialized = FALSE; + timer->u.timer.timer_pending = FALSE; + timer->u.timer.timer_set = FALSE; + timer->u.timer.timeout = 0; + timer->u.timer.period = 0; + timer->u.timer.window_length = 0; + + RtlEnterCriticalSection( &timerqueue.cs ); + + /* Make sure that the timerqueue thread is running. */ + if (!timerqueue.thread_running) + { + HANDLE thread; + status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0, + timerqueue_thread_proc, NULL, &thread, NULL ); + if (status == STATUS_SUCCESS) + { + timerqueue.thread_running = TRUE; + NtClose( thread ); + } + } + + if (status == STATUS_SUCCESS) + { + timer->u.timer.timer_initialized = TRUE; + timerqueue.objcount++; + } + + RtlLeaveCriticalSection( &timerqueue.cs ); + return status; +} + +/*********************************************************************** + * tp_timerqueue_unlock (internal) + * + * Releases a lock on the global timerqueue. + */ +static void tp_timerqueue_unlock( struct threadpool_object *timer ) +{ + assert( timer->type == TP_OBJECT_TYPE_TIMER ); + + RtlEnterCriticalSection( &timerqueue.cs ); + if (timer->u.timer.timer_initialized) + { + /* If timer was pending, remove it. */ + if (timer->u.timer.timer_pending) + { + list_remove( &timer->u.timer.timer_entry ); + timer->u.timer.timer_pending = FALSE; + } + + /* If the last timer object was destroyed, then wake up the thread. */ + if (!--timerqueue.objcount) + { + assert( list_empty( &timerqueue.pending_timers ) ); + RtlWakeAllConditionVariable( &timerqueue.update_event ); + } + + timer->u.timer.timer_initialized = FALSE; + } + RtlLeaveCriticalSection( &timerqueue.cs ); +} + +/*********************************************************************** * tp_threadpool_alloc (internal) * * Allocates a new threadpool object. @@ -1583,6 +1783,9 @@ static void tp_object_wait( struct threadpool_object *object, BOOL group_wait ) */ static void tp_object_shutdown( struct threadpool_object *object ) { + if (object->type == TP_OBJECT_TYPE_TIMER) + tp_timerqueue_unlock( object ); + object->shutdown = TRUE; } @@ -1785,7 +1988,6 @@ static void CALLBACK threadpool_worker_proc( void *param ) tp_threadpool_release( pool ); } - /*********************************************************************** * TpAllocCleanupGroup (NTDLL.@) */ @@ -1834,6 +2036,15 @@ NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID object->type = TP_OBJECT_TYPE_TIMER; object->u.timer.callback = callback; + + status = tp_timerqueue_lock( object ); + if (status) + { + tp_threadpool_unlock( pool ); + RtlFreeHeap( GetProcessHeap(), 0, object ); + return status; + } + tp_object_initialize( object, pool, userdata, environment ); *out = (TP_TIMER *)object; @@ -2021,6 +2232,18 @@ VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance ) } /*********************************************************************** + * TpIsTimerSet (NTDLL.@) + */ +BOOL WINAPI TpIsTimerSet( TP_TIMER *timer ) +{ + struct threadpool_object *this = impl_from_TP_TIMER( timer ); + + TRACE( "%p\n", timer ); + + return this->u.timer.timer_set; +} + +/*********************************************************************** * TpPostWork (NTDLL.@) */ VOID WINAPI TpPostWork( TP_WORK *work ) @@ -2196,6 +2419,84 @@ BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum ) } /*********************************************************************** + * TpSetTimer (NTDLL.@) + */ +VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length ) +{ + struct threadpool_object *this = impl_from_TP_TIMER( timer ); + struct threadpool_object *other_timer; + BOOL submit_timer = FALSE; + ULONGLONG timestamp; + + TRACE( "%p %p %u %u\n", timer, timeout, period, window_length ); + + RtlEnterCriticalSection( &timerqueue.cs ); + + assert( this->u.timer.timer_initialized ); + this->u.timer.timer_set = timeout != NULL; + + /* Convert relative timeout to absolute timestamp and handle a timeout + * of zero, which means that the timer is submitted immediately. */ + if (timeout) + { + timestamp = timeout->QuadPart; + if ((LONGLONG)timestamp < 0) + { + LARGE_INTEGER now; + NtQuerySystemTime( &now ); + timestamp = now.QuadPart - timestamp; + } + else if (!timestamp) + { + if (!period) + timeout = NULL; + else + { + LARGE_INTEGER now; + NtQuerySystemTime( &now ); + timestamp = now.QuadPart + (ULONGLONG)period * 10000; + } + submit_timer = TRUE; + } + } + + /* First remove existing timeout. */ + if (this->u.timer.timer_pending) + { + list_remove( &this->u.timer.timer_entry ); + this->u.timer.timer_pending = FALSE; + } + + /* If the timer was enabled, then add it back to the queue. */ + if (timeout) + { + this->u.timer.timeout = timestamp; + this->u.timer.period = period; + this->u.timer.window_length = window_length; + + LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers, + struct threadpool_object, u.timer.timer_entry ) + { + assert( other_timer->type == TP_OBJECT_TYPE_TIMER ); + if (this->u.timer.timeout < other_timer->u.timer.timeout) + break; + } + list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry ); + + /* Wake up the timer thread when the timeout has to be updated. */ + if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry ) + RtlWakeAllConditionVariable( &timerqueue.update_event ); + + this->u.timer.timer_pending = TRUE; + } + + RtlLeaveCriticalSection( &timerqueue.cs ); + + if (submit_timer) + tp_object_submit( this ); +} + +/*********************************************************************** * TpSimpleTryPost (NTDLL.@) */ NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata, @@ -2226,6 +2527,20 @@ NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata, } /*********************************************************************** + * TpWaitForTimer (NTDLL.@) + */ +VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending ) +{ + struct threadpool_object *this = impl_from_TP_TIMER( timer ); + + TRACE( "%p %d\n", timer, cancel_pending ); + + if (cancel_pending) + tp_object_cancel( this, FALSE, NULL ); + tp_object_wait( this, FALSE ); +} + +/*********************************************************************** * TpWaitForWork (NTDLL.@) */ VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending ) -- 2.4.4