From: Bernhard Loos Subject: [PATCH] rpcrt4: switch to non-overlapped named pipe operations and use a thread for the async listen operation Message-Id: Date: Mon, 19 Sep 2011 13:01:36 +0200 --- dlls/rpcrt4/rpc_transport.c | 80 ++++++++++++++++++++++--------------------- 1 files changed, 41 insertions(+), 39 deletions(-) diff --git a/dlls/rpcrt4/rpc_transport.c b/dlls/rpcrt4/rpc_transport.c index c2dac67..4632f4a 100644 --- a/dlls/rpcrt4/rpc_transport.c +++ b/dlls/rpcrt4/rpc_transport.c @@ -111,8 +111,7 @@ typedef struct _RpcConnection_np { RpcConnection common; HANDLE pipe; - OVERLAPPED read_ovl; - OVERLAPPED write_ovl; + HANDLE listen_thread; BOOL listening; } RpcConnection_np; @@ -122,25 +121,21 @@ static RpcConnection *rpcrt4_conn_np_alloc(void) return &npc->common; } -static RPC_STATUS rpcrt4_conn_listen_pipe(RpcConnection_np *npc) +static DWORD CALLBACK listen_thread(void *arg) { - if (npc->listening) - return RPC_S_OK; - - npc->listening = TRUE; + RpcConnection_np *npc = arg; for (;;) { - if (ConnectNamedPipe(npc->pipe, &npc->read_ovl)) + if (ConnectNamedPipe(npc->pipe, NULL)) return RPC_S_OK; switch(GetLastError()) { case ERROR_PIPE_CONNECTED: - SetEvent(npc->read_ovl.hEvent); - return RPC_S_OK; - case ERROR_IO_PENDING: - /* will be completed in rpcrt4_protseq_np_wait_for_new_connection */ return RPC_S_OK; + case ERROR_HANDLES_CLOSED: + /* connection closed during listen */ + return RPC_S_NO_CONTEXT_AVAILABLE; case ERROR_NO_DATA_DETECTED: /* client has disconnected, retry */ DisconnectNamedPipe( npc->pipe ); @@ -153,12 +148,28 @@ static RPC_STATUS rpcrt4_conn_listen_pipe(RpcConnection_np *npc) } } +static RPC_STATUS rpcrt4_conn_listen_pipe(RpcConnection_np *npc) +{ + if (npc->listening) + return RPC_S_OK; + + npc->listening = TRUE; + npc->listen_thread = CreateThread(NULL, 0, listen_thread, npc, 0, NULL); + if (!npc->listen_thread) + { + npc->listening = FALSE; + ERR("Couldn't create listen thread (error was %d)\n", GetLastError()); + return RPC_S_OUT_OF_RESOURCES; + } + return RPC_S_OK; +} + static RPC_STATUS rpcrt4_conn_create_pipe(RpcConnection *Connection, LPCSTR pname) { RpcConnection_np *npc = (RpcConnection_np *) Connection; TRACE("listening on %s\n", pname); - npc->pipe = CreateNamedPipeA(pname, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, + npc->pipe = CreateNamedPipeA(pname, PIPE_ACCESS_DUPLEX, PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE, PIPE_UNLIMITED_INSTANCES, RPC_MAX_PACKET_SIZE, RPC_MAX_PACKET_SIZE, 5000, NULL); @@ -170,9 +181,6 @@ static RPC_STATUS rpcrt4_conn_create_pipe(RpcConnection *Connection, LPCSTR pnam return RPC_S_CANT_CREATE_ENDPOINT; } - npc->read_ovl.hEvent = CreateEventW(NULL, TRUE, FALSE, NULL); - npc->write_ovl.hEvent = CreateEventW(NULL, TRUE, FALSE, NULL); - /* Note: we don't call ConnectNamedPipe here because it must be done in the * server thread as the thread must be alertable */ return RPC_S_OK; @@ -231,8 +239,6 @@ static RPC_STATUS rpcrt4_conn_open_pipe(RpcConnection *Connection, LPCSTR pname, /* pipe is connected; change to message-read mode. */ dwMode = PIPE_READMODE_MESSAGE; SetNamedPipeHandleState(pipe, &dwMode, NULL, NULL); - npc->read_ovl.hEvent = CreateEventW(NULL, TRUE, FALSE, NULL); - npc->write_ovl.hEvent = CreateEventW(NULL, TRUE, FALSE, NULL); npc->pipe = pipe; return RPC_S_OK; @@ -360,11 +366,9 @@ static void rpcrt4_conn_np_handoff(RpcConnection_np *old_npc, RpcConnection_np * * to the child, then reopen the server binding to continue listening */ new_npc->pipe = old_npc->pipe; - new_npc->read_ovl = old_npc->read_ovl; - new_npc->write_ovl = old_npc->write_ovl; + new_npc->listen_thread = old_npc->listen_thread; old_npc->pipe = 0; - memset(&old_npc->read_ovl, 0, sizeof(old_npc->read_ovl)); - memset(&old_npc->write_ovl, 0, sizeof(old_npc->write_ovl)); + old_npc->listen_thread = 0; old_npc->listening = FALSE; } @@ -413,9 +417,7 @@ static int rpcrt4_conn_np_read(RpcConnection *Connection, while (bytes_left) { DWORD bytes_read; - ret = ReadFile(npc->pipe, buf, bytes_left, &bytes_read, &npc->read_ovl); - if (!ret && GetLastError() == ERROR_IO_PENDING) - ret = GetOverlappedResult(npc->pipe, &npc->read_ovl, &bytes_read, TRUE); + ret = ReadFile(npc->pipe, buf, bytes_left, &bytes_read, NULL); if (!ret && GetLastError() == ERROR_MORE_DATA) ret = TRUE; if (!ret || !bytes_read) @@ -437,9 +439,7 @@ static int rpcrt4_conn_np_write(RpcConnection *Connection, while (bytes_left) { DWORD bytes_written; - ret = WriteFile(npc->pipe, buf, bytes_left, &bytes_written, &npc->write_ovl); - if (!ret && GetLastError() == ERROR_IO_PENDING) - ret = GetOverlappedResult(npc->pipe, &npc->write_ovl, &bytes_written, TRUE); + ret = WriteFile(npc->pipe, buf, bytes_left, &bytes_written, NULL); if (!ret || !bytes_written) break; bytes_left -= bytes_written; @@ -456,13 +456,9 @@ static int rpcrt4_conn_np_close(RpcConnection *Connection) CloseHandle(npc->pipe); npc->pipe = 0; } - if (npc->read_ovl.hEvent) { - CloseHandle(npc->read_ovl.hEvent); - npc->read_ovl.hEvent = 0; - } - if (npc->write_ovl.hEvent) { - CloseHandle(npc->write_ovl.hEvent); - npc->write_ovl.hEvent = 0; + if (npc->listen_thread) { + CloseHandle(npc->listen_thread); + npc->listen_thread = 0; } return 0; } @@ -666,7 +662,7 @@ static void *rpcrt4_protseq_np_get_wait_array(RpcServerProtseq *protseq, void *p conn = CONTAINING_RECORD(protseq->conn, RpcConnection_np, common); while (conn) { rpcrt4_conn_listen_pipe(conn); - if (conn->read_ovl.hEvent) + if (conn->listen_thread) (*count)++; conn = CONTAINING_RECORD(conn->common.Next, RpcConnection_np, common); } @@ -687,7 +683,7 @@ static void *rpcrt4_protseq_np_get_wait_array(RpcServerProtseq *protseq, void *p *count = 1; conn = CONTAINING_RECORD(protseq->conn, RpcConnection_np, common); while (conn) { - if ((objs[*count] = conn->read_ovl.hEvent)) + if ((objs[*count] = conn->listen_thread)) (*count)++; conn = CONTAINING_RECORD(conn->common.Next, RpcConnection_np, common); } @@ -734,12 +730,18 @@ static int rpcrt4_protseq_np_wait_for_new_connection(RpcServerProtseq *protseq, EnterCriticalSection(&protseq->cs); conn = CONTAINING_RECORD(protseq->conn, RpcConnection_np, common); while (conn) { - if (b_handle == conn->read_ovl.hEvent) break; + if (b_handle == conn->listen_thread) break; conn = CONTAINING_RECORD(conn->common.Next, RpcConnection_np, common); } cconn = NULL; if (conn) - RPCRT4_SpawnConnection(&cconn, &conn->common); + { + DWORD exit_code; + if (GetExitCodeThread(conn->listen_thread, &exit_code) && exit_code == RPC_S_OK) + RPCRT4_SpawnConnection(&cconn, &conn->common); + CloseHandle(conn->listen_thread); + conn->listen_thread = 0; + } else ERR("failed to locate connection for handle %p\n", b_handle); LeaveCriticalSection(&protseq->cs);