From: Jacek Caban Subject: [PATCH 12/16] server: Added support for message mode named pipes. Message-Id: Date: Wed, 5 Oct 2016 21:31:06 +0200 Signed-off-by: Jacek Caban --- This is the most important patch in the series. It moves named pipe I/O to server so that we may properly and portably support message mode. The series is prepared in a way that makes this patch no-op for byte mode named pipes. Although code would be much nicer if we moved them as well, I believe that scope and impact of this change is already huge. The biggest disadvantage of this change is performance. Server calls will always be slower than plain syscalls. This patch intends to implement proper results first and performance could be improved in follow ups. Ideally a simple read()/write() with immediate result could take only one server call. For that: - ntdll handle cache will need to store information about server-side pseudo fds, so that we don't need to call get_handle_fd just to get an error every time we do read/write - server calls could transfer the result directly when possible, instead of using APCs. That requires handling overlapped events on client side. Those shouldn't be too hard and I intend to look at them. dlls/kernel32/tests/pipe.c | 74 ++----- dlls/ntdll/file.c | 7 +- dlls/ntdll/tests/file.c | 2 +- server/async.c | 9 + server/file.h | 1 + server/named_pipe.c | 497 ++++++++++++++++++++++++++++++++++++++++++--- 6 files changed, 503 insertions(+), 87 deletions(-) diff --git a/dlls/kernel32/tests/pipe.c b/dlls/kernel32/tests/pipe.c index b3ba8b1..b928c10 100644 --- a/dlls/kernel32/tests/pipe.c +++ b/dlls/kernel32/tests/pipe.c @@ -255,9 +255,7 @@ static void test_CreateNamedPipe(int pipemode) else { SetLastError(0xdeadbeef); - todo_wine ok(!ReadFile(hnp, ibuf, 4, &readden, NULL), "ReadFile\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error\n"); } ok(readden == 4, "read got %d bytes\n", readden); @@ -278,15 +276,11 @@ static void test_CreateNamedPipe(int pipemode) else { SetLastError(0xdeadbeef); - todo_wine ok(!ReadFile(hnp, ibuf, 4, &readden, NULL), "ReadFile\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error\n"); ok(readden == 4, "read got %d bytes\n", readden); SetLastError(0xdeadbeef); - todo_wine ok(!ReadFile(hnp, ibuf + 4, 4, &readden, NULL), "ReadFile\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error\n"); } ok(readden == 4, "read got %d bytes\n", readden); @@ -308,8 +302,7 @@ static void test_CreateNamedPipe(int pipemode) } else { - /* ok(readden == sizeof(obuf), "peek3 got %d bytes\n", readden); */ - if (readden != sizeof(obuf)) todo_wine ok(0, "peek3 got %d bytes\n", readden); + ok(readden == sizeof(obuf), "peek3 got %d bytes\n", readden); } ok(avail == sizeof(obuf) + sizeof(obuf2), "peek3 got %d bytes available\n", avail); pbuf = ibuf; @@ -339,8 +332,7 @@ static void test_CreateNamedPipe(int pipemode) } else { - /* ok(readden == sizeof(obuf), "peek4 got %d bytes\n", readden); */ - if (readden != sizeof(obuf)) todo_wine ok(0, "peek4 got %d bytes\n", readden); + ok(readden == sizeof(obuf), "peek4 got %d bytes\n", readden); } ok(avail == sizeof(obuf) + sizeof(obuf2), "peek4 got %d bytes available\n", avail); pbuf = ibuf; @@ -354,9 +346,7 @@ static void test_CreateNamedPipe(int pipemode) ok(readden == sizeof(obuf) + sizeof(obuf2), "read 4 got %d bytes\n", readden); } else { - todo_wine { - ok(readden == sizeof(obuf), "read 4 got %d bytes\n", readden); - } + ok(readden == sizeof(obuf), "read 4 got %d bytes\n", readden); } pbuf = ibuf; ok(memcmp(obuf, pbuf, sizeof(obuf)) == 0, "content 4a check\n"); @@ -381,17 +371,13 @@ static void test_CreateNamedPipe(int pipemode) ok(WriteFile(hnp, obuf2, sizeof(obuf2), &written, NULL), " WriteFile5b\n"); ok(written == sizeof(obuf2), "write file len 3b\n"); ok(PeekNamedPipe(hFile, ibuf, sizeof(ibuf), &readden, &avail, NULL), "Peek5\n"); - /* currently the Wine behavior depends on the kernel version */ - /* ok(readden == sizeof(obuf), "peek5 got %d bytes\n", readden); */ - if (readden != sizeof(obuf)) todo_wine ok(0, "peek5 got %d bytes\n", readden); + ok(readden == sizeof(obuf), "peek5 got %d bytes\n", readden); ok(avail == sizeof(obuf) + sizeof(obuf2), "peek5 got %d bytes available\n", avail); pbuf = ibuf; ok(memcmp(obuf, pbuf, sizeof(obuf)) == 0, "content 5a check\n"); ok(ReadFile(hFile, ibuf, sizeof(ibuf), &readden, NULL), "ReadFile\n"); - todo_wine { - ok(readden == sizeof(obuf), "read 5 got %d bytes\n", readden); - } + ok(readden == sizeof(obuf), "read 5 got %d bytes\n", readden); pbuf = ibuf; ok(memcmp(obuf, pbuf, sizeof(obuf)) == 0, "content 5a check\n"); if (readden <= sizeof(obuf)) @@ -400,10 +386,8 @@ static void test_CreateNamedPipe(int pipemode) /* Multiple writes in the reverse direction */ /* the write of obuf2 from write4 should still be in the buffer */ ok(PeekNamedPipe(hnp, ibuf, sizeof(ibuf), &readden, &avail, NULL), "Peek6a\n"); - todo_wine { - ok(readden == sizeof(obuf2), "peek6a got %d bytes\n", readden); - ok(avail == sizeof(obuf2), "peek6a got %d bytes available\n", avail); - } + ok(readden == sizeof(obuf2), "peek6a got %d bytes\n", readden); + ok(avail == sizeof(obuf2), "peek6a got %d bytes available\n", avail); if (avail > 0) { ok(ReadFile(hnp, ibuf, sizeof(ibuf), &readden, NULL), "ReadFile\n"); ok(readden == sizeof(obuf2), "read 6a got %d bytes\n", readden); @@ -416,17 +400,13 @@ static void test_CreateNamedPipe(int pipemode) ok(WriteFile(hFile, obuf2, sizeof(obuf2), &written, NULL), " WriteFile6b\n"); ok(written == sizeof(obuf2), "write file len 6b\n"); ok(PeekNamedPipe(hnp, ibuf, sizeof(ibuf), &readden, &avail, NULL), "Peek6\n"); - /* currently the Wine behavior depends on the kernel version */ - /* ok(readden == sizeof(obuf), "peek6 got %d bytes\n", readden); */ - if (readden != sizeof(obuf)) todo_wine ok(0, "peek6 got %d bytes\n", readden); + ok(readden == sizeof(obuf), "peek6 got %d bytes\n", readden); ok(avail == sizeof(obuf) + sizeof(obuf2), "peek6b got %d bytes available\n", avail); pbuf = ibuf; ok(memcmp(obuf, pbuf, sizeof(obuf)) == 0, "content 6a check\n"); ok(ReadFile(hnp, ibuf, sizeof(ibuf), &readden, NULL), "ReadFile\n"); - todo_wine { - ok(readden == sizeof(obuf), "read 6b got %d bytes\n", readden); - } + ok(readden == sizeof(obuf), "read 6b got %d bytes\n", readden); pbuf = ibuf; ok(memcmp(obuf, pbuf, sizeof(obuf)) == 0, "content 6a check\n"); if (readden <= sizeof(obuf)) @@ -437,9 +417,7 @@ static void test_CreateNamedPipe(int pipemode) ok(WriteFile(hnp, obuf2, sizeof(obuf2), &written, NULL), "WriteFile 7\n"); ok(written == sizeof(obuf2), "write file len 7\n"); SetLastError(0xdeadbeef); - todo_wine ok(!ReadFile(hFile, ibuf, 4, &readden, NULL), "ReadFile 7\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 7\n"); ok(readden == 4, "read got %d bytes 7\n", readden); ok(ReadFile(hFile, ibuf + 4, sizeof(ibuf) - 4, &readden, NULL), "ReadFile 7\n"); @@ -450,9 +428,7 @@ static void test_CreateNamedPipe(int pipemode) ok(WriteFile(hFile, obuf, sizeof(obuf), &written, NULL), "WriteFile 8\n"); ok(written == sizeof(obuf), "write file len 8\n"); SetLastError(0xdeadbeef); - todo_wine ok(!ReadFile(hnp, ibuf, 4, &readden, NULL), "ReadFile 8\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 8\n"); ok(readden == 4, "read got %d bytes 8\n", readden); ok(ReadFile(hnp, ibuf + 4, sizeof(ibuf) - 4, &readden, NULL), "ReadFile 8\n"); @@ -469,21 +445,16 @@ static void test_CreateNamedPipe(int pipemode) ok(WriteFile(hnp, obuf2, sizeof(obuf2), &written, NULL), "WriteFile 9\n"); ok(written == sizeof(obuf2), "write file len 9\n"); SetLastError(0xdeadbeef); - todo_wine ok(!ReadFile(hFile, ibuf, 4, &readden, NULL), "ReadFile 9\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 9\n"); ok(readden == 4, "read got %d bytes 9\n", readden); SetLastError(0xdeadbeef); ret = RpcReadFile(hFile, ibuf + 4, 4, &readden, NULL); - todo_wine ok(!ret, "RpcReadFile 9\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 9\n"); ok(readden == 4, "read got %d bytes 9\n", readden); ret = RpcReadFile(hFile, ibuf + 8, sizeof(ibuf), &readden, NULL); ok(ret, "RpcReadFile 9\n"); - todo_wine ok(readden == sizeof(obuf) - 8, "read got %d bytes 9\n", readden); ok(memcmp(obuf, ibuf, sizeof(obuf)) == 0, "content check 9\n"); if (readden <= sizeof(obuf) - 8) /* blocks forever if second part was already received */ @@ -492,13 +463,10 @@ static void test_CreateNamedPipe(int pipemode) SetLastError(0xdeadbeef); ret = RpcReadFile(hFile, ibuf, 4, &readden, NULL); ok(!ret, "RpcReadFile 9\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 9\n"); ok(readden == 4, "read got %d bytes 9\n", readden); SetLastError(0xdeadbeef); - todo_wine ok(!ReadFile(hFile, ibuf + 4, 4, &readden, NULL), "ReadFile 9\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 9\n"); ok(readden == 4, "read got %d bytes 9\n", readden); ret = RpcReadFile(hFile, ibuf + 8, sizeof(ibuf), &readden, NULL); @@ -514,21 +482,16 @@ static void test_CreateNamedPipe(int pipemode) ok(WriteFile(hFile, obuf, sizeof(obuf), &written, NULL), "WriteFile 10\n"); ok(written == sizeof(obuf), "write file len 10\n"); SetLastError(0xdeadbeef); - todo_wine ok(!ReadFile(hnp, ibuf, 4, &readden, NULL), "ReadFile 10\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 10\n"); ok(readden == 4, "read got %d bytes 10\n", readden); SetLastError(0xdeadbeef); ret = RpcReadFile(hnp, ibuf + 4, 4, &readden, NULL); - todo_wine ok(!ret, "RpcReadFile 10\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 10\n"); ok(readden == 4, "read got %d bytes 10\n", readden); ret = RpcReadFile(hnp, ibuf + 8, sizeof(ibuf), &readden, NULL); ok(ret, "RpcReadFile 10\n"); - todo_wine ok(readden == sizeof(obuf2) - 8, "read got %d bytes 10\n", readden); ok(memcmp(obuf2, ibuf, sizeof(obuf2)) == 0, "content check 10\n"); if (readden <= sizeof(obuf2) - 8) /* blocks forever if second part was already received */ @@ -537,13 +500,10 @@ static void test_CreateNamedPipe(int pipemode) SetLastError(0xdeadbeef); ret = RpcReadFile(hnp, ibuf, 4, &readden, NULL); ok(!ret, "RpcReadFile 10\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 10\n"); ok(readden == 4, "read got %d bytes 10\n", readden); SetLastError(0xdeadbeef); - todo_wine ok(!ReadFile(hnp, ibuf + 4, 4, &readden, NULL), "ReadFile 10\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 10\n"); ok(readden == 4, "read got %d bytes 10\n", readden); ret = RpcReadFile(hnp, ibuf + 8, sizeof(ibuf), &readden, NULL); @@ -1424,7 +1384,7 @@ static void test_CloseHandle(void) numbytes = 0xdeadbeef; memset(buffer, 0, sizeof(buffer)); ret = ReadFile(hfile, buffer, 0, &numbytes, NULL); - todo_wine ok(ret, "ReadFile failed with %u\n", GetLastError()); + ok(ret, "ReadFile failed with %u\n", GetLastError()); ok(numbytes == 0, "expected 0, got %u\n", numbytes); numbytes = 0xdeadbeef; @@ -1447,7 +1407,7 @@ static void test_CloseHandle(void) SetLastError(0xdeadbeef); ret = WriteFile(hfile, testdata, sizeof(testdata), &numbytes, NULL); ok(!ret, "WriteFile unexpectedly succeeded\n"); - todo_wine ok(GetLastError() == ERROR_NO_DATA, "expected ERROR_NO_DATA, got %u\n", GetLastError()); + ok(GetLastError() == ERROR_NO_DATA, "expected ERROR_NO_DATA, got %u\n", GetLastError()); CloseHandle(hfile); @@ -1470,7 +1430,7 @@ static void test_CloseHandle(void) numbytes = 0xdeadbeef; memset(buffer, 0, sizeof(buffer)); ret = ReadFile(hfile, buffer, sizeof(buffer), &numbytes, NULL); - todo_wine ok(ret, "ReadFile failed with %u\n", GetLastError()); + ok(ret, "ReadFile failed with %u\n", GetLastError()); ok(numbytes == 0, "expected 0, got %u\n", numbytes); ret = GetNamedPipeHandleStateA(hfile, &state, NULL, NULL, NULL, NULL, 0); @@ -1482,12 +1442,12 @@ static void test_CloseHandle(void) SetLastError(0xdeadbeef); ret = ReadFile(hfile, buffer, 0, &numbytes, NULL); ok(!ret, "ReadFile unexpectedly succeeded\n"); - todo_wine ok(GetLastError() == ERROR_BROKEN_PIPE, "expected ERROR_BROKEN_PIPE, got %u\n", GetLastError()); + ok(GetLastError() == ERROR_BROKEN_PIPE, "expected ERROR_BROKEN_PIPE, got %u\n", GetLastError()); SetLastError(0xdeadbeef); ret = WriteFile(hfile, testdata, sizeof(testdata), &numbytes, NULL); ok(!ret, "WriteFile unexpectedly succeeded\n"); - todo_wine ok(GetLastError() == ERROR_NO_DATA, "expected ERROR_NO_DATA, got %u\n", GetLastError()); + ok(GetLastError() == ERROR_NO_DATA, "expected ERROR_NO_DATA, got %u\n", GetLastError()); CloseHandle(hfile); @@ -1541,7 +1501,7 @@ static void test_CloseHandle(void) SetLastError(0xdeadbeef); ret = WriteFile(hpipe, testdata, sizeof(testdata), &numbytes, NULL); ok(!ret, "WriteFile unexpectedly succeeded\n"); - todo_wine ok(GetLastError() == ERROR_NO_DATA, "expected ERROR_NO_DATA, got %u\n", GetLastError()); + ok(GetLastError() == ERROR_NO_DATA, "expected ERROR_NO_DATA, got %u\n", GetLastError()); CloseHandle(hpipe); @@ -1564,7 +1524,7 @@ static void test_CloseHandle(void) numbytes = 0xdeadbeef; memset(buffer, 0, sizeof(buffer)); ret = ReadFile(hpipe, buffer, sizeof(buffer), &numbytes, NULL); - todo_wine ok(ret, "ReadFile failed with %u\n", GetLastError()); + ok(ret, "ReadFile failed with %u\n", GetLastError()); ok(numbytes == 0, "expected 0, got %u\n", numbytes); ret = GetNamedPipeHandleStateA(hpipe, &state, NULL, NULL, NULL, NULL, 0); @@ -1581,7 +1541,7 @@ static void test_CloseHandle(void) SetLastError(0xdeadbeef); ret = WriteFile(hpipe, testdata, sizeof(testdata), &numbytes, NULL); ok(!ret, "WriteFile unexpectedly succeeded\n"); - todo_wine ok(GetLastError() == ERROR_NO_DATA, "expected ERROR_NO_DATA, got %u\n", GetLastError()); + ok(GetLastError() == ERROR_NO_DATA, "expected ERROR_NO_DATA, got %u\n", GetLastError()); CloseHandle(hpipe); } diff --git a/dlls/ntdll/file.c b/dlls/ntdll/file.c index 40755cc..3bb22a3 100644 --- a/dlls/ntdll/file.c +++ b/dlls/ntdll/file.c @@ -1729,8 +1729,11 @@ NTSTATUS WINAPI NtFsControlFile(HANDLE handle, HANDLE event, PIO_APC_ROUTINE apc break; } - if ((status = server_get_unix_fd( handle, FILE_READ_DATA, &fd, &needs_close, NULL, NULL ))) - break; + status = server_get_unix_fd( handle, FILE_READ_DATA, &fd, &needs_close, NULL, NULL ); + if (status == STATUS_BAD_DEVICE_TYPE) + return server_ioctl_file( handle, event, apc, apc_context, io, code, + in_buffer, in_size, out_buffer, out_size ); + if (status) break; #ifdef FIONREAD if (ioctl( fd, FIONREAD, &avail ) != 0) diff --git a/dlls/ntdll/tests/file.c b/dlls/ntdll/tests/file.c index 586b2c9..2950cdc 100644 --- a/dlls/ntdll/tests/file.c +++ b/dlls/ntdll/tests/file.c @@ -1270,7 +1270,7 @@ static void test_iocp_fileio(HANDLE h) ok( completionKey == CKEY_SECOND, "Invalid completion key: %lx\n", completionKey ); ok( ioSb.Information == 0, "Invalid ioSb.Information: %ld\n", ioSb.Information ); /* wine sends wrong status here */ - todo_wine ok( U(ioSb).Status == STATUS_PIPE_BROKEN, "Invalid ioSb.Status: %x\n", U(ioSb).Status); + ok( U(ioSb).Status == STATUS_PIPE_BROKEN, "Invalid ioSb.Status: %x\n", U(ioSb).Status); ok( completionValue == (ULONG_PTR)&o, "Invalid completion value: %lx\n", completionValue ); } } diff --git a/server/async.c b/server/async.c index d63714a..9d28842 100644 --- a/server/async.c +++ b/server/async.c @@ -464,3 +464,12 @@ struct async *find_client_async( struct list *queues, struct process *process, c return NULL; } + +/* find the first pending async in queue */ +struct async *find_pending_async( struct async_queue *queue ) +{ + struct async *async; + if (queue) LIST_FOR_EACH_ENTRY( async, &queue->queue, struct async, queue_entry ) + if (async->status == STATUS_PENDING) return async; + return NULL; +} diff --git a/server/file.h b/server/file.h index fb459a4..d35887a 100644 --- a/server/file.h +++ b/server/file.h @@ -196,6 +196,7 @@ extern struct iosb *alloc_iosb( const void *in_data, data_size_t in_size, data_s extern struct iosb *grab_iosb( struct iosb *iosb ); extern void release_iosb( struct iosb *iosb ); extern struct iosb *async_get_iosb( struct async *async ); +extern struct async *find_pending_async( struct async_queue *queue ); /* access rights that require Unix read permission */ #define FILE_UNIX_READ_ACCESS (FILE_READ_DATA|FILE_READ_ATTRIBUTES|FILE_READ_EA) diff --git a/server/named_pipe.c b/server/named_pipe.c index afb96d0..437c40c 100644 --- a/server/named_pipe.c +++ b/server/named_pipe.c @@ -3,6 +3,7 @@ * * Copyright (C) 1998 Alexandre Julliard * Copyright (C) 2001 Mike McCormack + * Copyright 2016 Jacek Caban for CodeWeavers * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -17,9 +18,6 @@ * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA - * - * TODO: - * message mode */ #include "config.h" @@ -66,11 +64,25 @@ enum pipe_state struct named_pipe; +struct pipe_message +{ + struct list entry; /* entry in message queue */ + size_t size; /* size of message data */ + size_t read_pos; /* already read bytes */ + char *data; /* message data */ + struct async *async; /* async of pending write */ +}; + struct pipe_end { struct object obj; /* object header */ struct fd *fd; /* pipe file descriptor */ unsigned int flags; /* pipe flags */ + struct pipe_end *connection; /* the other end of the pipe */ + struct list message_queue; + struct async_queue *read_q; /* read queue */ + struct async_queue *write_q; /* write queue */ + data_size_t buffer_size;/* size of buffered data that doesn't block caller */ }; struct pipe_server @@ -142,6 +154,14 @@ static const struct object_ops named_pipe_ops = named_pipe_destroy /* destroy */ }; +/* common server and client pipe end functions */ +static obj_handle_t pipe_end_write( struct fd *fd, const async_data_t *async_data, int blocking, + file_pos_t pos, data_size_t *written ); +static obj_handle_t pipe_end_read( struct fd *fd, const async_data_t *async_data, int blocking, + file_pos_t pos ); +static void pipe_end_queue_async( struct fd *fd, const async_data_t *data, int type, int count ); +static void pipe_end_reselect_async( struct fd *fd, struct async_queue *queue ); + /* server end functions */ static void pipe_server_dump( struct object *obj, int verbose ); static struct fd *pipe_server_get_fd( struct object *obj ); @@ -178,12 +198,12 @@ static const struct fd_ops pipe_server_fd_ops = default_fd_get_poll_events, /* get_poll_events */ default_poll_event, /* poll_event */ pipe_server_get_fd_type, /* get_fd_type */ - no_fd_read, /* read */ - no_fd_write, /* write */ + pipe_end_read, /* read */ + pipe_end_write, /* write */ pipe_server_flush, /* flush */ pipe_server_ioctl, /* ioctl */ - default_fd_queue_async, /* queue_async */ - default_fd_reselect_async /* reselect_async */ + pipe_end_queue_async, /* queue_async */ + pipe_end_reselect_async /* reselect_async */ }; /* client end functions */ @@ -192,6 +212,8 @@ static int pipe_client_signaled( struct object *obj, struct wait_queue_entry *en static struct fd *pipe_client_get_fd( struct object *obj ); static void pipe_client_destroy( struct object *obj ); static obj_handle_t pipe_client_flush( struct fd *fd, const async_data_t *async, int blocking ); +static obj_handle_t pipe_client_ioctl( struct fd *fd, ioctl_code_t code, const async_data_t *async, + int blocking ); static enum server_fd_type pipe_client_get_fd_type( struct fd *fd ); static const struct object_ops pipe_client_ops = @@ -221,12 +243,12 @@ static const struct fd_ops pipe_client_fd_ops = default_fd_get_poll_events, /* get_poll_events */ default_poll_event, /* poll_event */ pipe_client_get_fd_type, /* get_fd_type */ - no_fd_read, /* read */ - no_fd_write, /* write */ + pipe_end_read, /* read */ + pipe_end_write, /* write */ pipe_client_flush, /* flush */ - default_fd_ioctl, /* ioctl */ - default_fd_queue_async, /* queue_async */ - default_fd_reselect_async /* reselect_async */ + pipe_client_ioctl, /* ioctl */ + pipe_end_queue_async, /* queue_async */ + pipe_end_reselect_async /* reselect_async */ }; static void named_pipe_device_dump( struct object *obj, int verbose ); @@ -276,6 +298,12 @@ static const struct fd_ops named_pipe_device_fd_ops = default_fd_reselect_async /* reselect_async */ }; +/* Returns if we handle I/O via server calls. Currently message-mode pipes are handled this way. */ +static int use_server_io( struct pipe_end *pipe_end ) +{ + return pipe_end->flags & NAMED_PIPE_MESSAGE_STREAM_WRITE; +} + static void named_pipe_dump( struct object *obj, int verbose ) { fputs( "Named pipe\n", stderr ); @@ -370,6 +398,56 @@ static void notify_empty( struct pipe_server *server ) fd_async_wake_up( server->pipe_end.fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS ); } +static void wake_message( struct pipe_message *message ) +{ + struct async *async = message->async; + struct iosb *iosb; + + if (!async) return; + + message->async = NULL; + iosb = async_get_iosb( async ); + iosb->status = STATUS_SUCCESS; + iosb->result = message->size; + async_terminate( async, iosb->result ? STATUS_ALERTED : STATUS_SUCCESS ); + release_object( async ); + release_iosb( iosb ); +} + +static void free_message( struct pipe_message *message ) +{ + list_remove( &message->entry ); + if( message->data ) free( message->data ); + free( message ); +} + +static void pipe_end_disconnect( struct pipe_end *pipe_end, NTSTATUS status ) +{ + struct pipe_end *connection = pipe_end->connection; + + pipe_end->connection = NULL; + + if (use_server_io( pipe_end )) + { + struct pipe_message *message, *next; + struct async *async; + if (pipe_end->fd) fd_async_wake_up( pipe_end->fd, ASYNC_TYPE_WAIT, status ); + async_wake_up( pipe_end->read_q, status ); + LIST_FOR_EACH_ENTRY_SAFE( message, next, &pipe_end->message_queue, struct pipe_message, entry ) + { + if (!(async = message->async)) continue; + free_message( message ); + async_terminate( async, status ); + release_object( async ); + } + } + if (connection) + { + connection->connection = NULL; + pipe_end_disconnect( connection, status ); + } +} + static void do_disconnect( struct pipe_server *server ) { /* we may only have a server fd, if the client disconnected */ @@ -377,27 +455,46 @@ static void do_disconnect( struct pipe_server *server ) { assert( server->client->server == server ); assert( server->client->pipe_end.fd ); - release_object( server->client->pipe_end.fd ); - server->client->pipe_end.fd = NULL; + if (!use_server_io( &server->pipe_end )) + { + release_object( server->client->pipe_end.fd ); + server->client->pipe_end.fd = NULL; + } } assert( server->pipe_end.fd ); - shutdown( get_unix_fd( server->pipe_end.fd ), SHUT_RDWR ); + if (!use_server_io( &server->pipe_end )) + shutdown( get_unix_fd( server->pipe_end.fd ), SHUT_RDWR ); release_object( server->pipe_end.fd ); server->pipe_end.fd = NULL; } +static void pipe_end_destroy( struct pipe_end *pipe_end ) +{ + struct pipe_message *message; + + while (!list_empty( &pipe_end->message_queue )) + { + message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry ); + assert( !message->async ); + free_message( message ); + } +} + static void pipe_server_destroy( struct object *obj) { struct pipe_server *server = (struct pipe_server *)obj; assert( obj->ops == &pipe_server_ops ); + pipe_end_disconnect( &server->pipe_end, STATUS_PIPE_BROKEN ); + if (server->pipe_end.fd) { notify_empty( server ); do_disconnect( server ); } + pipe_end_destroy( &server->pipe_end ); if (server->client) { server->client->server = NULL; @@ -419,6 +516,8 @@ static void pipe_client_destroy( struct object *obj) assert( obj->ops == &pipe_client_ops ); + pipe_end_disconnect( &client->pipe_end, STATUS_PIPE_BROKEN ); + if (server) { notify_empty( server ); @@ -443,6 +542,8 @@ static void pipe_client_destroy( struct object *obj) server->client = NULL; client->server = NULL; } + + pipe_end_destroy( &client->pipe_end ); if (client->pipe_end.fd) release_object( client->pipe_end.fd ); } @@ -525,6 +626,9 @@ static int pipe_data_remaining( struct pipe_server *server ) assert( server->client ); + if (use_server_io( &server->pipe_end )) + return !list_empty( &server->client->pipe_end.message_queue ); + fd = get_unix_fd( server->client->pipe_end.fd ); if (fd < 0) return 0; @@ -558,6 +662,9 @@ static obj_handle_t pipe_end_flush( struct pipe_end *pipe_end, const async_data_ obj_handle_t handle = 0; struct async *async; + if (use_server_io( pipe_end ) && (!pipe_end->connection || list_empty( &pipe_end->connection->message_queue ))) + return 0; + if ((async = fd_queue_async( pipe_end->fd, async_data, NULL, ASYNC_TYPE_WAIT ))) { if (blocking) handle = alloc_handle( current->process, async, SYNCHRONIZE, 0 ); @@ -579,7 +686,7 @@ static obj_handle_t pipe_server_flush( struct fd *fd, const async_data_t *async_ handle = pipe_end_flush( &server->pipe_end, async_data, blocking ); /* there's no unix way to be alerted when a pipe becomes empty, so resort to polling */ - if (handle && !server->flush_poll) + if (handle && !use_server_io( &server->pipe_end ) && !server->flush_poll) server->flush_poll = add_timeout_user( -TICKS_PER_SEC / 10, check_flushed, server ); return handle; } @@ -590,6 +697,257 @@ static obj_handle_t pipe_client_flush( struct fd *fd, const async_data_t *async, return 0; } +static void message_queue_read( struct pipe_end *pipe_end, struct iosb *iosb ) +{ + const int is_message_mode = pipe_end->flags & NAMED_PIPE_MESSAGE_STREAM_READ; + struct pipe_message *message; + + if (is_message_mode) + { + message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry ); + iosb->out_size = min( iosb->out_size, message->size - message->read_pos ); + iosb->status = message->read_pos + iosb->out_size < message->size ? STATUS_BUFFER_OVERFLOW : STATUS_SUCCESS; + } + else + { + data_size_t avail = 0; + LIST_FOR_EACH_ENTRY( message, &pipe_end->message_queue, struct pipe_message, entry ) + { + avail += message->size - message->read_pos; + if (avail >= iosb->out_size) break; + } + iosb->out_size = min( iosb->out_size, avail ); + iosb->status = STATUS_SUCCESS; + } + + message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry ); + if (!message->read_pos && message->size == iosb->out_size) /* fast path */ + { + iosb->out_data = message->data; + message->data = NULL; + wake_message( message ); + free_message( message ); + } + else + { + data_size_t write_pos = 0, writing; + char *buf = NULL; + + if (iosb->out_size && !(buf = iosb->out_data = malloc( iosb->out_size ))) + { + iosb->out_size = 0; + iosb->status = STATUS_NO_MEMORY; + return; + } + + do { + message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry ); + writing = min( iosb->out_size - write_pos, message->size - message->read_pos ); + if (writing) memcpy( buf + write_pos, message->data + message->read_pos, writing ); + write_pos += writing; + message->read_pos += writing; + if (message->read_pos == message->size) + { + wake_message(message); + free_message(message); + } + } while (write_pos < iosb->out_size); + } + iosb->result = iosb->out_size; +} + +/* We call async_terminate in our reselect implementation, which causes recursive reselect. + * We're not interested in such reselect calls, so we ignore them. */ +static int ignore_reselect; + +static void reselect_write_queue( struct pipe_end *pipe_end ); + +static void reselect_read_queue( struct pipe_end *pipe_end ) +{ + struct async *async; + struct iosb *iosb; + int read_done = 0; + + ignore_reselect = 1; + + while (!list_empty( &pipe_end->message_queue) && (async = find_pending_async( pipe_end->read_q ))) + { + iosb = async_get_iosb( async ); + message_queue_read( pipe_end, iosb ); + async_terminate( async, iosb->result ? STATUS_ALERTED : iosb->status ); + release_iosb( iosb ); + read_done++; + } + + ignore_reselect = 0; + + if (pipe_end->connection) + { + if (list_empty( &pipe_end->message_queue )) + fd_async_wake_up( pipe_end->connection->fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS ); + else if (read_done) + reselect_write_queue( pipe_end->connection ); + } +} + +static void reselect_write_queue( struct pipe_end *pipe_end ) +{ + struct pipe_message *message, *next; + struct pipe_end *reader = pipe_end->connection; + data_size_t avail = 0; + struct iosb *iosb; + struct async *async; + + if (!reader) return; + + ignore_reselect = 1; + + LIST_FOR_EACH_ENTRY_SAFE( message, next, &reader->message_queue, struct pipe_message, entry ) + { + if ((async = message->async)) + { + iosb = async_get_iosb( async ); + if (iosb->status != STATUS_PENDING) + { + message->async = NULL; + free_message( message ); + release_object( async ); + release_iosb( iosb ); + continue; + } + release_iosb( iosb ); + } + avail += message->size - message->read_pos; + if (message->async && (avail <= reader->buffer_size || !message->size)) + wake_message( message ); + } + + ignore_reselect = 0; + reselect_read_queue( reader ); +} + +static obj_handle_t pipe_end_read( struct fd *fd, const async_data_t *async_data, int blocking, + file_pos_t pos ) +{ + struct pipe_end *pipe_end = get_fd_user( fd ); + obj_handle_t handle = 0; + struct async *async; + struct iosb *iosb; + + if (!use_server_io( pipe_end )) + { + set_error( STATUS_NOT_SUPPORTED ); + return 0; + } + + if (!pipe_end->connection && list_empty( &pipe_end->message_queue )) + { + set_error(STATUS_PIPE_BROKEN); + return 0; + } + + if (!pipe_end->read_q && !(pipe_end->read_q = create_async_queue( fd ))) return 0; + + if (!(iosb = alloc_iosb( NULL, 0, get_reply_max_size() ))) return 0; + if (!(async = create_async( current, pipe_end->read_q, async_data, iosb ))) + { + release_iosb( iosb ); + return 0; + } + + reselect_read_queue( pipe_end ); + + /* FIXME: we alloc wait handle for non-blocking reads that are already completed so that APC has a chance to transfer + * result to client immediately. It would be better to pass response directly in here, but that requires overlapped + * support on client side. */ + if (!(blocking || iosb->status != STATUS_PENDING) || (handle = alloc_handle( current->process, async, SYNCHRONIZE, 0 ))) + set_error( STATUS_PENDING ); + + release_iosb( iosb ); + release_object( async ); + return handle; +} + +static obj_handle_t pipe_end_write( struct fd *fd, const async_data_t *async_data, int blocking, + file_pos_t pos, data_size_t *written ) +{ + struct pipe_end *write_end = get_fd_user( fd ); + struct pipe_end *read_end = write_end->connection; + data_size_t size = get_req_data_size(); + struct pipe_message *message; + obj_handle_t handle = 0; + struct async *async; + struct iosb *iosb; + + if (!use_server_io( write_end )) + { + set_error( STATUS_NOT_SUPPORTED ); + return 0; + } + + if (!read_end) + { + set_error( STATUS_PIPE_CLOSING ); + return 0; + } + + if (!write_end->write_q && !(write_end->write_q = create_async_queue( fd ))) return 0; + + if (!(message = mem_alloc(sizeof(*message)))) return 0; + message->size = size; + message->async = NULL; + message->read_pos = 0; + message->data = NULL; + + if (size && !(message->data = memdup(get_req_data(), size))) + { + free(message); + return 0; + } + + list_add_tail( &read_end->message_queue, &message->entry ); + + if (!(iosb = alloc_iosb( NULL, 0, 0 ))) return 0; + if (!(async = create_async( current, write_end->write_q, async_data, iosb ))) { + release_iosb( iosb ); + return 0; + } + message->async = (struct async *)grab_object( async ); + + reselect_write_queue( write_end ); + + /* FIXME: we alloc wait handle for non-blocking writes that are already completed so that APC has a chance to transfer + * result to client immediately. It would be better to pass response directly in here, but that requires overlapped + * support on client side. */ + if (!(blocking || iosb->status != STATUS_PENDING) || (handle = alloc_handle( current->process, async, SYNCHRONIZE, 0 ))) + set_error( STATUS_PENDING ); + + release_object( async ); + release_iosb( iosb ); + *written = 0; + return handle; +} + +static void pipe_end_queue_async( struct fd *fd, const async_data_t *data, int type, int count ) +{ + struct pipe_end *pipe_end = get_fd_user( fd ); + if (use_server_io( pipe_end )) no_fd_queue_async( fd, data, type, count ); + else default_fd_queue_async( fd, data, type, count ); +} + +static void pipe_end_reselect_async( struct fd *fd, struct async_queue *queue ) +{ + struct pipe_end *pipe_end = get_fd_user( fd ); + + if (ignore_reselect) return; + if (!use_server_io( pipe_end )) return default_fd_reselect_async( fd, queue ); + + if (pipe_end->write_q && pipe_end->write_q == queue) + reselect_write_queue( pipe_end ); + else if (pipe_end->read_q && pipe_end->read_q == queue) + reselect_read_queue( pipe_end ); +} + static inline int is_overlapped( unsigned int options ) { return !(options & (FILE_SYNCHRONOUS_IO_ALERT | FILE_SYNCHRONOUS_IO_NONALERT)); @@ -605,6 +963,44 @@ static enum server_fd_type pipe_client_get_fd_type( struct fd *fd ) return FD_TYPE_PIPE; } +static void pipe_end_peek( struct pipe_end *pipe_end ) +{ + unsigned reply_size = get_reply_max_size(); + FILE_PIPE_PEEK_BUFFER *buffer; + struct pipe_message *message; + size_t avail = 0; + + if (!use_server_io( pipe_end )) + { + set_error( STATUS_NOT_SUPPORTED ); + return; + } + + if (reply_size < FIELD_OFFSET( FILE_PIPE_PEEK_BUFFER, Data )) + { + set_error(STATUS_INFO_LENGTH_MISMATCH); + return; + } + reply_size -= FIELD_OFFSET( FILE_PIPE_PEEK_BUFFER, Data ); + + LIST_FOR_EACH_ENTRY( message, &pipe_end->message_queue, struct pipe_message, entry ) + avail += message->size - message->read_pos; + reply_size = min( reply_size, avail ); + + if (avail) + { + message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry ); + reply_size = min( reply_size, message->size ); + } + + if (!(buffer = set_reply_data_size(FIELD_OFFSET( FILE_PIPE_PEEK_BUFFER, Data[reply_size] )))) return; + buffer->NamedPipeState = 0; /* FIXME */ + buffer->ReadDataAvailable = avail; + buffer->NumberOfMessages = 0; /* FIXME */ + buffer->MessageLength = 0; /* FIXME */ + if (avail) memcpy( buffer->Data, message->data + message->read_pos, reply_size ); +} + static obj_handle_t pipe_server_ioctl( struct fd *fd, ioctl_code_t code, const async_data_t *async_data, int blocking ) { @@ -655,11 +1051,13 @@ static obj_handle_t pipe_server_ioctl( struct fd *fd, ioctl_code_t code, const a /* dump the client and server fds, but keep the pointers around - client loses all waiting data */ + pipe_end_disconnect( &server->pipe_end, STATUS_PIPE_DISCONNECTED ); do_disconnect( server ); set_server_state( server, ps_disconnected_server ); break; case ps_wait_disconnect: assert( !server->client ); + pipe_end_disconnect( &server->pipe_end, STATUS_PIPE_DISCONNECTED ); do_disconnect( server ); set_server_state( server, ps_wait_connect ); break; @@ -674,6 +1072,26 @@ static obj_handle_t pipe_server_ioctl( struct fd *fd, ioctl_code_t code, const a } return 0; + case FSCTL_PIPE_PEEK: + pipe_end_peek( &server->pipe_end ); + return 0; + + default: + return default_fd_ioctl( fd, code, async_data, blocking ); + } +} + +static obj_handle_t pipe_client_ioctl( struct fd *fd, ioctl_code_t code, const async_data_t *async_data, + int blocking ) +{ + struct pipe_client *client = get_fd_user( fd ); + + switch(code) + { + case FSCTL_PIPE_PEEK: + pipe_end_peek( &client->pipe_end ); + return 0; + default: return default_fd_ioctl( fd, code, async_data, blocking ); } @@ -687,10 +1105,15 @@ static struct pipe_server *get_pipe_server_obj( struct process *process, return (struct pipe_server *) obj; } -static void init_pipe_end( struct pipe_end *pipe_end, unsigned int pipe_flags ) +static void init_pipe_end( struct pipe_end *pipe_end, unsigned int pipe_flags, data_size_t buffer_size ) { pipe_end->fd = NULL; pipe_end->flags = pipe_flags; + pipe_end->buffer_size = buffer_size; + pipe_end->connection = NULL; + pipe_end->read_q = NULL; + pipe_end->write_q = NULL; + list_init( &pipe_end->message_queue); } static struct pipe_server *create_pipe_server( struct named_pipe *pipe, unsigned int options, @@ -706,7 +1129,7 @@ static struct pipe_server *create_pipe_server( struct named_pipe *pipe, unsigned server->client = NULL; server->flush_poll = NULL; server->options = options; - init_pipe_end( &server->pipe_end, pipe_flags ); + init_pipe_end( &server->pipe_end, pipe_flags, pipe->insize ); list_add_head( &pipe->servers, &server->entry ); grab_object( pipe ); @@ -720,7 +1143,7 @@ static struct pipe_server *create_pipe_server( struct named_pipe *pipe, unsigned return server; } -static struct pipe_client *create_pipe_client( unsigned int flags, unsigned int pipe_flags ) +static struct pipe_client *create_pipe_client( unsigned int flags, unsigned int pipe_flags, data_size_t buffer_size ) { struct pipe_client *client; @@ -730,7 +1153,7 @@ static struct pipe_client *create_pipe_client( unsigned int flags, unsigned int client->server = NULL; client->flags = flags; - init_pipe_end( &client->pipe_end, pipe_flags ); + init_pipe_end( &client->pipe_end, pipe_flags, buffer_size ); return client; } @@ -794,9 +1217,24 @@ static struct object *named_pipe_open_file( struct object *obj, unsigned int acc return NULL; } - if ((client = create_pipe_client( options, pipe->flags ))) + if ((client = create_pipe_client( options, pipe->flags, pipe->outsize ))) { - if (!socketpair( PF_UNIX, SOCK_STREAM, 0, fds )) + if (use_server_io( &server->pipe_end )) + { + client->pipe_end.fd = alloc_pseudo_fd( &pipe_client_fd_ops, &client->pipe_end.obj, options ); + if (client->pipe_end.fd) + { + set_fd_signaled( client->pipe_end.fd, 1 ); + server->pipe_end.fd = (struct fd *)grab_object( server->ioctl_fd ); + set_no_fd_status( server->ioctl_fd, STATUS_BAD_DEVICE_TYPE ); + } + else + { + release_object( client ); + client = NULL; + } + } + else if (!socketpair( PF_UNIX, SOCK_STREAM, 0, fds )) { assert( !server->pipe_end.fd ); @@ -824,11 +1262,6 @@ static struct object *named_pipe_open_file( struct object *obj, unsigned int acc allow_fd_caching( client->pipe_end.fd ); allow_fd_caching( server->pipe_end.fd ); fd_copy_completion( server->ioctl_fd, server->pipe_end.fd ); - if (server->state == ps_wait_open) - fd_async_wake_up( server->ioctl_fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS ); - set_server_state( server, ps_connected_server ); - server->client = client; - client->server = server; } else { @@ -842,6 +1275,16 @@ static struct object *named_pipe_open_file( struct object *obj, unsigned int acc release_object( client ); client = NULL; } + if (client) + { + if (server->state == ps_wait_open) + fd_async_wake_up( server->ioctl_fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS ); + set_server_state( server, ps_connected_server ); + server->client = client; + client->server = server; + server->pipe_end.connection = &client->pipe_end; + client->pipe_end.connection = &server->pipe_end; + } } release_object( server ); return &client->pipe_end.obj;