From: Jacek Caban Subject: [PATCH 07/10] server: Added support for message mode named pipes. Message-Id: <8e0cc14e-5817-131e-5012-a2c745f23a83@codeweavers.com> Date: Wed, 19 Oct 2016 19:05:31 +0200 Signed-off-by: Jacek Caban --- dlls/kernel32/tests/pipe.c | 74 ++----- dlls/ntdll/file.c | 7 +- dlls/ntdll/tests/file.c | 2 +- server/async.c | 9 + server/file.h | 1 + server/named_pipe.c | 532 ++++++++++++++++++++++++++++++++++++++++++--- 6 files changed, 535 insertions(+), 90 deletions(-) diff --git a/dlls/kernel32/tests/pipe.c b/dlls/kernel32/tests/pipe.c index b3ba8b1..b928c10 100644 --- a/dlls/kernel32/tests/pipe.c +++ b/dlls/kernel32/tests/pipe.c @@ -255,9 +255,7 @@ static void test_CreateNamedPipe(int pipemode) else { SetLastError(0xdeadbeef); - todo_wine ok(!ReadFile(hnp, ibuf, 4, &readden, NULL), "ReadFile\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error\n"); } ok(readden == 4, "read got %d bytes\n", readden); @@ -278,15 +276,11 @@ static void test_CreateNamedPipe(int pipemode) else { SetLastError(0xdeadbeef); - todo_wine ok(!ReadFile(hnp, ibuf, 4, &readden, NULL), "ReadFile\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error\n"); ok(readden == 4, "read got %d bytes\n", readden); SetLastError(0xdeadbeef); - todo_wine ok(!ReadFile(hnp, ibuf + 4, 4, &readden, NULL), "ReadFile\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error\n"); } ok(readden == 4, "read got %d bytes\n", readden); @@ -308,8 +302,7 @@ static void test_CreateNamedPipe(int pipemode) } else { - /* ok(readden == sizeof(obuf), "peek3 got %d bytes\n", readden); */ - if (readden != sizeof(obuf)) todo_wine ok(0, "peek3 got %d bytes\n", readden); + ok(readden == sizeof(obuf), "peek3 got %d bytes\n", readden); } ok(avail == sizeof(obuf) + sizeof(obuf2), "peek3 got %d bytes available\n", avail); pbuf = ibuf; @@ -339,8 +332,7 @@ static void test_CreateNamedPipe(int pipemode) } else { - /* ok(readden == sizeof(obuf), "peek4 got %d bytes\n", readden); */ - if (readden != sizeof(obuf)) todo_wine ok(0, "peek4 got %d bytes\n", readden); + ok(readden == sizeof(obuf), "peek4 got %d bytes\n", readden); } ok(avail == sizeof(obuf) + sizeof(obuf2), "peek4 got %d bytes available\n", avail); pbuf = ibuf; @@ -354,9 +346,7 @@ static void test_CreateNamedPipe(int pipemode) ok(readden == sizeof(obuf) + sizeof(obuf2), "read 4 got %d bytes\n", readden); } else { - todo_wine { - ok(readden == sizeof(obuf), "read 4 got %d bytes\n", readden); - } + ok(readden == sizeof(obuf), "read 4 got %d bytes\n", readden); } pbuf = ibuf; ok(memcmp(obuf, pbuf, sizeof(obuf)) == 0, "content 4a check\n"); @@ -381,17 +371,13 @@ static void test_CreateNamedPipe(int pipemode) ok(WriteFile(hnp, obuf2, sizeof(obuf2), &written, NULL), " WriteFile5b\n"); ok(written == sizeof(obuf2), "write file len 3b\n"); ok(PeekNamedPipe(hFile, ibuf, sizeof(ibuf), &readden, &avail, NULL), "Peek5\n"); - /* currently the Wine behavior depends on the kernel version */ - /* ok(readden == sizeof(obuf), "peek5 got %d bytes\n", readden); */ - if (readden != sizeof(obuf)) todo_wine ok(0, "peek5 got %d bytes\n", readden); + ok(readden == sizeof(obuf), "peek5 got %d bytes\n", readden); ok(avail == sizeof(obuf) + sizeof(obuf2), "peek5 got %d bytes available\n", avail); pbuf = ibuf; ok(memcmp(obuf, pbuf, sizeof(obuf)) == 0, "content 5a check\n"); ok(ReadFile(hFile, ibuf, sizeof(ibuf), &readden, NULL), "ReadFile\n"); - todo_wine { - ok(readden == sizeof(obuf), "read 5 got %d bytes\n", readden); - } + ok(readden == sizeof(obuf), "read 5 got %d bytes\n", readden); pbuf = ibuf; ok(memcmp(obuf, pbuf, sizeof(obuf)) == 0, "content 5a check\n"); if (readden <= sizeof(obuf)) @@ -400,10 +386,8 @@ static void test_CreateNamedPipe(int pipemode) /* Multiple writes in the reverse direction */ /* the write of obuf2 from write4 should still be in the buffer */ ok(PeekNamedPipe(hnp, ibuf, sizeof(ibuf), &readden, &avail, NULL), "Peek6a\n"); - todo_wine { - ok(readden == sizeof(obuf2), "peek6a got %d bytes\n", readden); - ok(avail == sizeof(obuf2), "peek6a got %d bytes available\n", avail); - } + ok(readden == sizeof(obuf2), "peek6a got %d bytes\n", readden); + ok(avail == sizeof(obuf2), "peek6a got %d bytes available\n", avail); if (avail > 0) { ok(ReadFile(hnp, ibuf, sizeof(ibuf), &readden, NULL), "ReadFile\n"); ok(readden == sizeof(obuf2), "read 6a got %d bytes\n", readden); @@ -416,17 +400,13 @@ static void test_CreateNamedPipe(int pipemode) ok(WriteFile(hFile, obuf2, sizeof(obuf2), &written, NULL), " WriteFile6b\n"); ok(written == sizeof(obuf2), "write file len 6b\n"); ok(PeekNamedPipe(hnp, ibuf, sizeof(ibuf), &readden, &avail, NULL), "Peek6\n"); - /* currently the Wine behavior depends on the kernel version */ - /* ok(readden == sizeof(obuf), "peek6 got %d bytes\n", readden); */ - if (readden != sizeof(obuf)) todo_wine ok(0, "peek6 got %d bytes\n", readden); + ok(readden == sizeof(obuf), "peek6 got %d bytes\n", readden); ok(avail == sizeof(obuf) + sizeof(obuf2), "peek6b got %d bytes available\n", avail); pbuf = ibuf; ok(memcmp(obuf, pbuf, sizeof(obuf)) == 0, "content 6a check\n"); ok(ReadFile(hnp, ibuf, sizeof(ibuf), &readden, NULL), "ReadFile\n"); - todo_wine { - ok(readden == sizeof(obuf), "read 6b got %d bytes\n", readden); - } + ok(readden == sizeof(obuf), "read 6b got %d bytes\n", readden); pbuf = ibuf; ok(memcmp(obuf, pbuf, sizeof(obuf)) == 0, "content 6a check\n"); if (readden <= sizeof(obuf)) @@ -437,9 +417,7 @@ static void test_CreateNamedPipe(int pipemode) ok(WriteFile(hnp, obuf2, sizeof(obuf2), &written, NULL), "WriteFile 7\n"); ok(written == sizeof(obuf2), "write file len 7\n"); SetLastError(0xdeadbeef); - todo_wine ok(!ReadFile(hFile, ibuf, 4, &readden, NULL), "ReadFile 7\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 7\n"); ok(readden == 4, "read got %d bytes 7\n", readden); ok(ReadFile(hFile, ibuf + 4, sizeof(ibuf) - 4, &readden, NULL), "ReadFile 7\n"); @@ -450,9 +428,7 @@ static void test_CreateNamedPipe(int pipemode) ok(WriteFile(hFile, obuf, sizeof(obuf), &written, NULL), "WriteFile 8\n"); ok(written == sizeof(obuf), "write file len 8\n"); SetLastError(0xdeadbeef); - todo_wine ok(!ReadFile(hnp, ibuf, 4, &readden, NULL), "ReadFile 8\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 8\n"); ok(readden == 4, "read got %d bytes 8\n", readden); ok(ReadFile(hnp, ibuf + 4, sizeof(ibuf) - 4, &readden, NULL), "ReadFile 8\n"); @@ -469,21 +445,16 @@ static void test_CreateNamedPipe(int pipemode) ok(WriteFile(hnp, obuf2, sizeof(obuf2), &written, NULL), "WriteFile 9\n"); ok(written == sizeof(obuf2), "write file len 9\n"); SetLastError(0xdeadbeef); - todo_wine ok(!ReadFile(hFile, ibuf, 4, &readden, NULL), "ReadFile 9\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 9\n"); ok(readden == 4, "read got %d bytes 9\n", readden); SetLastError(0xdeadbeef); ret = RpcReadFile(hFile, ibuf + 4, 4, &readden, NULL); - todo_wine ok(!ret, "RpcReadFile 9\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 9\n"); ok(readden == 4, "read got %d bytes 9\n", readden); ret = RpcReadFile(hFile, ibuf + 8, sizeof(ibuf), &readden, NULL); ok(ret, "RpcReadFile 9\n"); - todo_wine ok(readden == sizeof(obuf) - 8, "read got %d bytes 9\n", readden); ok(memcmp(obuf, ibuf, sizeof(obuf)) == 0, "content check 9\n"); if (readden <= sizeof(obuf) - 8) /* blocks forever if second part was already received */ @@ -492,13 +463,10 @@ static void test_CreateNamedPipe(int pipemode) SetLastError(0xdeadbeef); ret = RpcReadFile(hFile, ibuf, 4, &readden, NULL); ok(!ret, "RpcReadFile 9\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 9\n"); ok(readden == 4, "read got %d bytes 9\n", readden); SetLastError(0xdeadbeef); - todo_wine ok(!ReadFile(hFile, ibuf + 4, 4, &readden, NULL), "ReadFile 9\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 9\n"); ok(readden == 4, "read got %d bytes 9\n", readden); ret = RpcReadFile(hFile, ibuf + 8, sizeof(ibuf), &readden, NULL); @@ -514,21 +482,16 @@ static void test_CreateNamedPipe(int pipemode) ok(WriteFile(hFile, obuf, sizeof(obuf), &written, NULL), "WriteFile 10\n"); ok(written == sizeof(obuf), "write file len 10\n"); SetLastError(0xdeadbeef); - todo_wine ok(!ReadFile(hnp, ibuf, 4, &readden, NULL), "ReadFile 10\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 10\n"); ok(readden == 4, "read got %d bytes 10\n", readden); SetLastError(0xdeadbeef); ret = RpcReadFile(hnp, ibuf + 4, 4, &readden, NULL); - todo_wine ok(!ret, "RpcReadFile 10\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 10\n"); ok(readden == 4, "read got %d bytes 10\n", readden); ret = RpcReadFile(hnp, ibuf + 8, sizeof(ibuf), &readden, NULL); ok(ret, "RpcReadFile 10\n"); - todo_wine ok(readden == sizeof(obuf2) - 8, "read got %d bytes 10\n", readden); ok(memcmp(obuf2, ibuf, sizeof(obuf2)) == 0, "content check 10\n"); if (readden <= sizeof(obuf2) - 8) /* blocks forever if second part was already received */ @@ -537,13 +500,10 @@ static void test_CreateNamedPipe(int pipemode) SetLastError(0xdeadbeef); ret = RpcReadFile(hnp, ibuf, 4, &readden, NULL); ok(!ret, "RpcReadFile 10\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 10\n"); ok(readden == 4, "read got %d bytes 10\n", readden); SetLastError(0xdeadbeef); - todo_wine ok(!ReadFile(hnp, ibuf + 4, 4, &readden, NULL), "ReadFile 10\n"); - todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 10\n"); ok(readden == 4, "read got %d bytes 10\n", readden); ret = RpcReadFile(hnp, ibuf + 8, sizeof(ibuf), &readden, NULL); @@ -1424,7 +1384,7 @@ static void test_CloseHandle(void) numbytes = 0xdeadbeef; memset(buffer, 0, sizeof(buffer)); ret = ReadFile(hfile, buffer, 0, &numbytes, NULL); - todo_wine ok(ret, "ReadFile failed with %u\n", GetLastError()); + ok(ret, "ReadFile failed with %u\n", GetLastError()); ok(numbytes == 0, "expected 0, got %u\n", numbytes); numbytes = 0xdeadbeef; @@ -1447,7 +1407,7 @@ static void test_CloseHandle(void) SetLastError(0xdeadbeef); ret = WriteFile(hfile, testdata, sizeof(testdata), &numbytes, NULL); ok(!ret, "WriteFile unexpectedly succeeded\n"); - todo_wine ok(GetLastError() == ERROR_NO_DATA, "expected ERROR_NO_DATA, got %u\n", GetLastError()); + ok(GetLastError() == ERROR_NO_DATA, "expected ERROR_NO_DATA, got %u\n", GetLastError()); CloseHandle(hfile); @@ -1470,7 +1430,7 @@ static void test_CloseHandle(void) numbytes = 0xdeadbeef; memset(buffer, 0, sizeof(buffer)); ret = ReadFile(hfile, buffer, sizeof(buffer), &numbytes, NULL); - todo_wine ok(ret, "ReadFile failed with %u\n", GetLastError()); + ok(ret, "ReadFile failed with %u\n", GetLastError()); ok(numbytes == 0, "expected 0, got %u\n", numbytes); ret = GetNamedPipeHandleStateA(hfile, &state, NULL, NULL, NULL, NULL, 0); @@ -1482,12 +1442,12 @@ static void test_CloseHandle(void) SetLastError(0xdeadbeef); ret = ReadFile(hfile, buffer, 0, &numbytes, NULL); ok(!ret, "ReadFile unexpectedly succeeded\n"); - todo_wine ok(GetLastError() == ERROR_BROKEN_PIPE, "expected ERROR_BROKEN_PIPE, got %u\n", GetLastError()); + ok(GetLastError() == ERROR_BROKEN_PIPE, "expected ERROR_BROKEN_PIPE, got %u\n", GetLastError()); SetLastError(0xdeadbeef); ret = WriteFile(hfile, testdata, sizeof(testdata), &numbytes, NULL); ok(!ret, "WriteFile unexpectedly succeeded\n"); - todo_wine ok(GetLastError() == ERROR_NO_DATA, "expected ERROR_NO_DATA, got %u\n", GetLastError()); + ok(GetLastError() == ERROR_NO_DATA, "expected ERROR_NO_DATA, got %u\n", GetLastError()); CloseHandle(hfile); @@ -1541,7 +1501,7 @@ static void test_CloseHandle(void) SetLastError(0xdeadbeef); ret = WriteFile(hpipe, testdata, sizeof(testdata), &numbytes, NULL); ok(!ret, "WriteFile unexpectedly succeeded\n"); - todo_wine ok(GetLastError() == ERROR_NO_DATA, "expected ERROR_NO_DATA, got %u\n", GetLastError()); + ok(GetLastError() == ERROR_NO_DATA, "expected ERROR_NO_DATA, got %u\n", GetLastError()); CloseHandle(hpipe); @@ -1564,7 +1524,7 @@ static void test_CloseHandle(void) numbytes = 0xdeadbeef; memset(buffer, 0, sizeof(buffer)); ret = ReadFile(hpipe, buffer, sizeof(buffer), &numbytes, NULL); - todo_wine ok(ret, "ReadFile failed with %u\n", GetLastError()); + ok(ret, "ReadFile failed with %u\n", GetLastError()); ok(numbytes == 0, "expected 0, got %u\n", numbytes); ret = GetNamedPipeHandleStateA(hpipe, &state, NULL, NULL, NULL, NULL, 0); @@ -1581,7 +1541,7 @@ static void test_CloseHandle(void) SetLastError(0xdeadbeef); ret = WriteFile(hpipe, testdata, sizeof(testdata), &numbytes, NULL); ok(!ret, "WriteFile unexpectedly succeeded\n"); - todo_wine ok(GetLastError() == ERROR_NO_DATA, "expected ERROR_NO_DATA, got %u\n", GetLastError()); + ok(GetLastError() == ERROR_NO_DATA, "expected ERROR_NO_DATA, got %u\n", GetLastError()); CloseHandle(hpipe); } diff --git a/dlls/ntdll/file.c b/dlls/ntdll/file.c index ac8b765..7ef6956 100644 --- a/dlls/ntdll/file.c +++ b/dlls/ntdll/file.c @@ -1731,8 +1731,11 @@ NTSTATUS WINAPI NtFsControlFile(HANDLE handle, HANDLE event, PIO_APC_ROUTINE apc break; } - if ((status = server_get_unix_fd( handle, FILE_READ_DATA, &fd, &needs_close, NULL, NULL ))) - break; + status = server_get_unix_fd( handle, FILE_READ_DATA, &fd, &needs_close, NULL, NULL ); + if (status == STATUS_BAD_DEVICE_TYPE) + return server_ioctl_file( handle, event, apc, apc_context, io, code, + in_buffer, in_size, out_buffer, out_size ); + if (status) break; #ifdef FIONREAD if (ioctl( fd, FIONREAD, &avail ) != 0) diff --git a/dlls/ntdll/tests/file.c b/dlls/ntdll/tests/file.c index 586b2c9..2950cdc 100644 --- a/dlls/ntdll/tests/file.c +++ b/dlls/ntdll/tests/file.c @@ -1270,7 +1270,7 @@ static void test_iocp_fileio(HANDLE h) ok( completionKey == CKEY_SECOND, "Invalid completion key: %lx\n", completionKey ); ok( ioSb.Information == 0, "Invalid ioSb.Information: %ld\n", ioSb.Information ); /* wine sends wrong status here */ - todo_wine ok( U(ioSb).Status == STATUS_PIPE_BROKEN, "Invalid ioSb.Status: %x\n", U(ioSb).Status); + ok( U(ioSb).Status == STATUS_PIPE_BROKEN, "Invalid ioSb.Status: %x\n", U(ioSb).Status); ok( completionValue == (ULONG_PTR)&o, "Invalid completion value: %lx\n", completionValue ); } } diff --git a/server/async.c b/server/async.c index 707bab5..cfdd2d0 100644 --- a/server/async.c +++ b/server/async.c @@ -447,3 +447,12 @@ struct async *find_async( struct async_queue *queue, struct process *process, cl return NULL; } + +/* find the first pending async in queue */ +struct async *find_pending_async( struct async_queue *queue ) +{ + struct async *async; + if (queue) LIST_FOR_EACH_ENTRY( async, &queue->queue, struct async, queue_entry ) + if (async->status == STATUS_PENDING) return async; + return NULL; +} diff --git a/server/file.h b/server/file.h index 75a2e03..4ffec3d 100644 --- a/server/file.h +++ b/server/file.h @@ -200,6 +200,7 @@ extern struct iosb *alloc_iosb( const void *in_data, data_size_t in_size, data_s extern struct iosb *grab_iosb( struct iosb *iosb ); extern void release_iosb( struct iosb *iosb ); extern struct iosb *async_get_iosb( struct async *async ); +extern struct async *find_pending_async( struct async_queue *queue ); /* access rights that require Unix read permission */ #define FILE_UNIX_READ_ACCESS (FILE_READ_DATA|FILE_READ_ATTRIBUTES|FILE_READ_EA) diff --git a/server/named_pipe.c b/server/named_pipe.c index 2312b63..3ca0ebb 100644 --- a/server/named_pipe.c +++ b/server/named_pipe.c @@ -3,6 +3,7 @@ * * Copyright (C) 1998 Alexandre Julliard * Copyright (C) 2001 Mike McCormack + * Copyright 2016 Jacek Caban for CodeWeavers * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -17,9 +18,6 @@ * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA - * - * TODO: - * message mode */ #include "config.h" @@ -66,11 +64,25 @@ enum pipe_state struct named_pipe; +struct pipe_message +{ + struct list entry; /* entry in message queue */ + size_t size; /* size of message data */ + size_t read_pos; /* already read bytes */ + char *data; /* message data */ + struct async *async; /* async of pending write */ +}; + struct pipe_end { struct object obj; /* object header */ struct fd *fd; /* pipe file descriptor */ unsigned int flags; /* pipe flags */ + struct pipe_end *connection; /* the other end of the pipe */ + struct list message_queue; + struct async_queue *read_q; /* read queue */ + struct async_queue *write_q; /* write queue */ + data_size_t buffer_size;/* size of buffered data that doesn't block caller */ }; struct pipe_server @@ -142,6 +154,16 @@ static const struct object_ops named_pipe_ops = named_pipe_destroy /* destroy */ }; +/* common server and client pipe end functions */ +static obj_handle_t pipe_end_write( struct fd *fd, const async_data_t *async_data, int blocking, + file_pos_t pos, data_size_t *written ); +static obj_handle_t pipe_end_read( struct fd *fd, const async_data_t *async_data, int blocking, + file_pos_t pos ); +static void pipe_end_queue_async( struct fd *fd, const async_data_t *data, int type, int count ); +static void pipe_end_reselect_async( struct fd *fd, struct async_queue *queue ); +static int pipe_end_cancel_async( struct fd *fd, struct process *process, struct thread *thread, client_ptr_t iosb ); +static struct async *pipe_end_find_async( struct fd *fd, struct process *process, client_ptr_t client_async ); + /* server end functions */ static void pipe_server_dump( struct object *obj, int verbose ); static struct fd *pipe_server_get_fd( struct object *obj ); @@ -178,13 +200,14 @@ static const struct fd_ops pipe_server_fd_ops = default_fd_get_poll_events, /* get_poll_events */ default_poll_event, /* poll_event */ pipe_server_get_fd_type, /* get_fd_type */ - no_fd_read, /* read */ - no_fd_write, /* write */ + pipe_end_read, /* read */ + pipe_end_write, /* write */ pipe_server_flush, /* flush */ pipe_server_ioctl, /* ioctl */ - default_fd_queue_async, /* queue_async */ - default_fd_reselect_async, /* reselect_async */ - default_fd_cancel_async, /* cancel_async */ + pipe_end_queue_async, /* queue_async */ + pipe_end_reselect_async, /* reselect_async */ + pipe_end_cancel_async, /* cancel_async */ + pipe_end_find_async /* find_async */ }; /* client end functions */ @@ -193,6 +216,8 @@ static int pipe_client_signaled( struct object *obj, struct wait_queue_entry *en static struct fd *pipe_client_get_fd( struct object *obj ); static void pipe_client_destroy( struct object *obj ); static obj_handle_t pipe_client_flush( struct fd *fd, const async_data_t *async, int blocking ); +static obj_handle_t pipe_client_ioctl( struct fd *fd, ioctl_code_t code, const async_data_t *async, + int blocking ); static enum server_fd_type pipe_client_get_fd_type( struct fd *fd ); static const struct object_ops pipe_client_ops = @@ -222,13 +247,14 @@ static const struct fd_ops pipe_client_fd_ops = default_fd_get_poll_events, /* get_poll_events */ default_poll_event, /* poll_event */ pipe_client_get_fd_type, /* get_fd_type */ - no_fd_read, /* read */ - no_fd_write, /* write */ + pipe_end_read, /* read */ + pipe_end_write, /* write */ pipe_client_flush, /* flush */ - default_fd_ioctl, /* ioctl */ - default_fd_queue_async, /* queue_async */ - default_fd_reselect_async, /* reselect_async */ - default_fd_cancel_async /* cancel_async */ + pipe_client_ioctl, /* ioctl */ + pipe_end_queue_async, /* queue_async */ + pipe_end_reselect_async, /* reselect_async */ + pipe_end_cancel_async, /* cancel_async */ + pipe_end_find_async /* find_async */ }; static void named_pipe_device_dump( struct object *obj, int verbose ); @@ -276,9 +302,16 @@ static const struct fd_ops named_pipe_device_fd_ops = named_pipe_device_ioctl, /* ioctl */ default_fd_queue_async, /* queue_async */ default_fd_reselect_async, /* reselect_async */ - default_fd_cancel_async /* cancel_async */ + default_fd_cancel_async, /* cancel_async */ + default_fd_find_async /* find_async */ }; +/* Returns if we handle I/O via server calls. Currently message-mode pipes are handled this way. */ +static int use_server_io( struct pipe_end *pipe_end ) +{ + return pipe_end->flags & NAMED_PIPE_MESSAGE_STREAM_WRITE; +} + static void named_pipe_dump( struct object *obj, int verbose ) { fputs( "Named pipe\n", stderr ); @@ -373,6 +406,56 @@ static void notify_empty( struct pipe_server *server ) fd_async_wake_up( server->pipe_end.fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS ); } +static void wake_message( struct pipe_message *message ) +{ + struct async *async = message->async; + struct iosb *iosb; + + if (!async) return; + + message->async = NULL; + iosb = async_get_iosb( async ); + iosb->status = STATUS_SUCCESS; + iosb->result = message->size; + async_terminate( async, iosb->result ? STATUS_ALERTED : STATUS_SUCCESS ); + release_object( async ); + release_iosb( iosb ); +} + +static void free_message( struct pipe_message *message ) +{ + list_remove( &message->entry ); + if( message->data ) free( message->data ); + free( message ); +} + +static void pipe_end_disconnect( struct pipe_end *pipe_end, NTSTATUS status ) +{ + struct pipe_end *connection = pipe_end->connection; + + pipe_end->connection = NULL; + + if (use_server_io( pipe_end )) + { + struct pipe_message *message, *next; + struct async *async; + if (pipe_end->fd) fd_async_wake_up( pipe_end->fd, ASYNC_TYPE_WAIT, status ); + async_wake_up( pipe_end->read_q, status ); + LIST_FOR_EACH_ENTRY_SAFE( message, next, &pipe_end->message_queue, struct pipe_message, entry ) + { + if (!(async = message->async)) continue; + free_message( message ); + async_terminate( async, status ); + release_object( async ); + } + } + if (connection) + { + connection->connection = NULL; + pipe_end_disconnect( connection, status ); + } +} + static void do_disconnect( struct pipe_server *server ) { /* we may only have a server fd, if the client disconnected */ @@ -380,27 +463,49 @@ static void do_disconnect( struct pipe_server *server ) { assert( server->client->server == server ); assert( server->client->pipe_end.fd ); - release_object( server->client->pipe_end.fd ); - server->client->pipe_end.fd = NULL; + if (!use_server_io( &server->pipe_end )) + { + release_object( server->client->pipe_end.fd ); + server->client->pipe_end.fd = NULL; + } } assert( server->pipe_end.fd ); - shutdown( get_unix_fd( server->pipe_end.fd ), SHUT_RDWR ); + if (!use_server_io( &server->pipe_end )) + shutdown( get_unix_fd( server->pipe_end.fd ), SHUT_RDWR ); release_object( server->pipe_end.fd ); server->pipe_end.fd = NULL; } +static void pipe_end_destroy( struct pipe_end *pipe_end ) +{ + struct pipe_message *message; + + while (!list_empty( &pipe_end->message_queue )) + { + message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry ); + assert( !message->async ); + free_message( message ); + } + + free_async_queue( pipe_end->read_q ); + free_async_queue( pipe_end->write_q ); +} + static void pipe_server_destroy( struct object *obj) { struct pipe_server *server = (struct pipe_server *)obj; assert( obj->ops == &pipe_server_ops ); + pipe_end_disconnect( &server->pipe_end, STATUS_PIPE_BROKEN ); + if (server->pipe_end.fd) { notify_empty( server ); do_disconnect( server ); } + pipe_end_destroy( &server->pipe_end ); if (server->client) { server->client->server = NULL; @@ -422,6 +527,8 @@ static void pipe_client_destroy( struct object *obj) assert( obj->ops == &pipe_client_ops ); + pipe_end_disconnect( &client->pipe_end, STATUS_PIPE_BROKEN ); + if (server) { notify_empty( server ); @@ -446,6 +553,8 @@ static void pipe_client_destroy( struct object *obj) server->client = NULL; client->server = NULL; } + + pipe_end_destroy( &client->pipe_end ); if (client->pipe_end.fd) release_object( client->pipe_end.fd ); } @@ -528,6 +637,9 @@ static int pipe_data_remaining( struct pipe_server *server ) assert( server->client ); + if (use_server_io( &server->pipe_end )) + return !list_empty( &server->client->pipe_end.message_queue ); + fd = get_unix_fd( server->client->pipe_end.fd ); if (fd < 0) return 0; @@ -561,6 +673,9 @@ static obj_handle_t pipe_end_flush( struct pipe_end *pipe_end, const async_data_ obj_handle_t handle = 0; struct async *async; + if (use_server_io( pipe_end ) && (!pipe_end->connection || list_empty( &pipe_end->connection->message_queue ))) + return 0; + if ((async = fd_queue_async( pipe_end->fd, async_data, NULL, ASYNC_TYPE_WAIT ))) { if (blocking) handle = alloc_handle( current->process, async, SYNCHRONIZE, 0 ); @@ -582,7 +697,7 @@ static obj_handle_t pipe_server_flush( struct fd *fd, const async_data_t *async_ handle = pipe_end_flush( &server->pipe_end, async_data, blocking ); /* there's no unix way to be alerted when a pipe becomes empty, so resort to polling */ - if (handle && !server->flush_poll) + if (handle && !use_server_io( &server->pipe_end ) && !server->flush_poll) server->flush_poll = add_timeout_user( -TICKS_PER_SEC / 10, check_flushed, server ); return handle; } @@ -593,6 +708,278 @@ static obj_handle_t pipe_client_flush( struct fd *fd, const async_data_t *async, return 0; } +static void message_queue_read( struct pipe_end *pipe_end, struct iosb *iosb ) +{ + const int is_message_mode = pipe_end->flags & NAMED_PIPE_MESSAGE_STREAM_READ; + struct pipe_message *message; + + if (is_message_mode) + { + message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry ); + iosb->out_size = min( iosb->out_size, message->size - message->read_pos ); + iosb->status = message->read_pos + iosb->out_size < message->size ? STATUS_BUFFER_OVERFLOW : STATUS_SUCCESS; + } + else + { + data_size_t avail = 0; + LIST_FOR_EACH_ENTRY( message, &pipe_end->message_queue, struct pipe_message, entry ) + { + avail += message->size - message->read_pos; + if (avail >= iosb->out_size) break; + } + iosb->out_size = min( iosb->out_size, avail ); + iosb->status = STATUS_SUCCESS; + } + + message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry ); + if (!message->read_pos && message->size == iosb->out_size) /* fast path */ + { + iosb->out_data = message->data; + message->data = NULL; + wake_message( message ); + free_message( message ); + } + else + { + data_size_t write_pos = 0, writing; + char *buf = NULL; + + if (iosb->out_size && !(buf = iosb->out_data = malloc( iosb->out_size ))) + { + iosb->out_size = 0; + iosb->status = STATUS_NO_MEMORY; + return; + } + + do { + message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry ); + writing = min( iosb->out_size - write_pos, message->size - message->read_pos ); + if (writing) memcpy( buf + write_pos, message->data + message->read_pos, writing ); + write_pos += writing; + message->read_pos += writing; + if (message->read_pos == message->size) + { + wake_message(message); + free_message(message); + } + } while (write_pos < iosb->out_size); + } + iosb->result = iosb->out_size; +} + +/* We call async_terminate in our reselect implementation, which causes recursive reselect. + * We're not interested in such reselect calls, so we ignore them. */ +static int ignore_reselect; + +static void reselect_write_queue( struct pipe_end *pipe_end ); + +static void reselect_read_queue( struct pipe_end *pipe_end ) +{ + struct async *async; + struct iosb *iosb; + int read_done = 0; + + ignore_reselect = 1; + + while (!list_empty( &pipe_end->message_queue) && (async = find_pending_async( pipe_end->read_q ))) + { + iosb = async_get_iosb( async ); + message_queue_read( pipe_end, iosb ); + async_terminate( async, iosb->result ? STATUS_ALERTED : iosb->status ); + release_iosb( iosb ); + read_done++; + } + + ignore_reselect = 0; + + if (pipe_end->connection) + { + if (list_empty( &pipe_end->message_queue )) + fd_async_wake_up( pipe_end->connection->fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS ); + else if (read_done) + reselect_write_queue( pipe_end->connection ); + } +} + +static void reselect_write_queue( struct pipe_end *pipe_end ) +{ + struct pipe_message *message, *next; + struct pipe_end *reader = pipe_end->connection; + data_size_t avail = 0; + struct iosb *iosb; + struct async *async; + + if (!reader) return; + + ignore_reselect = 1; + + LIST_FOR_EACH_ENTRY_SAFE( message, next, &reader->message_queue, struct pipe_message, entry ) + { + if ((async = message->async)) + { + iosb = async_get_iosb( async ); + if (iosb->status != STATUS_PENDING) + { + message->async = NULL; + free_message( message ); + release_object( async ); + release_iosb( iosb ); + continue; + } + release_iosb( iosb ); + } + avail += message->size - message->read_pos; + if (message->async && (avail <= reader->buffer_size || !message->size)) + wake_message( message ); + } + + ignore_reselect = 0; + reselect_read_queue( reader ); +} + +static obj_handle_t pipe_end_read( struct fd *fd, const async_data_t *async_data, int blocking, + file_pos_t pos ) +{ + struct pipe_end *pipe_end = get_fd_user( fd ); + obj_handle_t handle = 0; + struct async *async; + struct iosb *iosb; + + if (!use_server_io( pipe_end )) + { + set_error( STATUS_NOT_SUPPORTED ); + return 0; + } + + if (!pipe_end->connection && list_empty( &pipe_end->message_queue )) + { + set_error(STATUS_PIPE_BROKEN); + return 0; + } + + if (!pipe_end->read_q && !(pipe_end->read_q = create_async_queue( fd ))) return 0; + + if (!(iosb = alloc_iosb( NULL, 0, get_reply_max_size() ))) return 0; + if (!(async = create_async( current, pipe_end->read_q, async_data, iosb ))) + { + release_iosb( iosb ); + return 0; + } + + reselect_read_queue( pipe_end ); + + /* FIXME: we alloc wait handle for non-blocking reads that are already completed so that APC has a chance to transfer + * result to client immediately. It would be better to pass response directly in here, but that requires overlapped + * support on client side. */ + if (!(blocking || iosb->status != STATUS_PENDING) || (handle = alloc_handle( current->process, async, SYNCHRONIZE, 0 ))) + set_error( STATUS_PENDING ); + + release_iosb( iosb ); + release_object( async ); + return handle; +} + +static obj_handle_t pipe_end_write( struct fd *fd, const async_data_t *async_data, int blocking, + file_pos_t pos, data_size_t *written ) +{ + struct pipe_end *write_end = get_fd_user( fd ); + struct pipe_end *read_end = write_end->connection; + data_size_t size = get_req_data_size(); + struct pipe_message *message; + obj_handle_t handle = 0; + struct async *async; + struct iosb *iosb; + + if (!use_server_io( write_end )) + { + set_error( STATUS_NOT_SUPPORTED ); + return 0; + } + + if (!read_end) + { + set_error( STATUS_PIPE_CLOSING ); + return 0; + } + + if (!write_end->write_q && !(write_end->write_q = create_async_queue( fd ))) return 0; + + if (!(message = mem_alloc(sizeof(*message)))) return 0; + message->size = size; + message->async = NULL; + message->read_pos = 0; + message->data = NULL; + + if (size && !(message->data = memdup(get_req_data(), size))) + { + free(message); + return 0; + } + + list_add_tail( &read_end->message_queue, &message->entry ); + + if (!(iosb = alloc_iosb( NULL, 0, 0 ))) return 0; + if (!(async = create_async( current, write_end->write_q, async_data, iosb ))) { + release_iosb( iosb ); + return 0; + } + message->async = (struct async *)grab_object( async ); + + reselect_write_queue( write_end ); + + /* FIXME: we alloc wait handle for non-blocking writes that are already completed so that APC has a chance to transfer + * result to client immediately. It would be better to pass response directly in here, but that requires overlapped + * support on client side. */ + if (!(blocking || iosb->status != STATUS_PENDING) || (handle = alloc_handle( current->process, async, SYNCHRONIZE, 0 ))) + set_error( STATUS_PENDING ); + + release_object( async ); + release_iosb( iosb ); + *written = 0; + return handle; +} + +static void pipe_end_queue_async( struct fd *fd, const async_data_t *data, int type, int count ) +{ + struct pipe_end *pipe_end = get_fd_user( fd ); + if (use_server_io( pipe_end )) no_fd_queue_async( fd, data, type, count ); + else default_fd_queue_async( fd, data, type, count ); +} + +static void pipe_end_reselect_async( struct fd *fd, struct async_queue *queue ) +{ + struct pipe_end *pipe_end = get_fd_user( fd ); + + if (ignore_reselect) return; + if (!use_server_io( pipe_end )) return default_fd_reselect_async( fd, queue ); + + if (pipe_end->write_q && pipe_end->write_q == queue) + reselect_write_queue( pipe_end ); + else if (pipe_end->read_q && pipe_end->read_q == queue) + reselect_read_queue( pipe_end ); +} + +static int pipe_end_cancel_async( struct fd *fd, struct process *process, struct thread *thread, client_ptr_t iosb ) +{ + struct pipe_end *pipe_end = get_fd_user( fd ); + int n = 0; + + n += async_wake_up_by( pipe_end->read_q, process, thread, iosb, STATUS_CANCELLED ); + n += async_wake_up_by( pipe_end->write_q, process, thread, iosb, STATUS_CANCELLED ); + return n + default_fd_cancel_async( fd, process, thread, iosb ); +} + +struct async *pipe_end_find_async( struct fd *fd, struct process *process, client_ptr_t client_async ) +{ + struct pipe_end *pipe_end = get_fd_user( fd ); + struct async *async; + + async = find_async( pipe_end->read_q, process, client_async); + if (!async) async = find_async( pipe_end->write_q, process, client_async ); + if (!async) async = default_fd_find_async( fd, process, client_async ); + return async; +} + static inline int is_overlapped( unsigned int options ) { return !(options & (FILE_SYNCHRONOUS_IO_ALERT | FILE_SYNCHRONOUS_IO_NONALERT)); @@ -608,6 +995,44 @@ static enum server_fd_type pipe_client_get_fd_type( struct fd *fd ) return FD_TYPE_PIPE; } +static void pipe_end_peek( struct pipe_end *pipe_end ) +{ + unsigned reply_size = get_reply_max_size(); + FILE_PIPE_PEEK_BUFFER *buffer; + struct pipe_message *message; + size_t avail = 0; + + if (!use_server_io( pipe_end )) + { + set_error( STATUS_NOT_SUPPORTED ); + return; + } + + if (reply_size < FIELD_OFFSET( FILE_PIPE_PEEK_BUFFER, Data )) + { + set_error(STATUS_INFO_LENGTH_MISMATCH); + return; + } + reply_size -= FIELD_OFFSET( FILE_PIPE_PEEK_BUFFER, Data ); + + LIST_FOR_EACH_ENTRY( message, &pipe_end->message_queue, struct pipe_message, entry ) + avail += message->size - message->read_pos; + reply_size = min( reply_size, avail ); + + if (avail) + { + message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry ); + reply_size = min( reply_size, message->size ); + } + + if (!(buffer = set_reply_data_size(FIELD_OFFSET( FILE_PIPE_PEEK_BUFFER, Data[reply_size] )))) return; + buffer->NamedPipeState = 0; /* FIXME */ + buffer->ReadDataAvailable = avail; + buffer->NumberOfMessages = 0; /* FIXME */ + buffer->MessageLength = 0; /* FIXME */ + if (avail) memcpy( buffer->Data, message->data + message->read_pos, reply_size ); +} + static obj_handle_t pipe_server_ioctl( struct fd *fd, ioctl_code_t code, const async_data_t *async_data, int blocking ) { @@ -658,11 +1083,13 @@ static obj_handle_t pipe_server_ioctl( struct fd *fd, ioctl_code_t code, const a /* dump the client and server fds, but keep the pointers around - client loses all waiting data */ + pipe_end_disconnect( &server->pipe_end, STATUS_PIPE_DISCONNECTED ); do_disconnect( server ); set_server_state( server, ps_disconnected_server ); break; case ps_wait_disconnect: assert( !server->client ); + pipe_end_disconnect( &server->pipe_end, STATUS_PIPE_DISCONNECTED ); do_disconnect( server ); set_server_state( server, ps_wait_connect ); break; @@ -677,6 +1104,26 @@ static obj_handle_t pipe_server_ioctl( struct fd *fd, ioctl_code_t code, const a } return 0; + case FSCTL_PIPE_PEEK: + pipe_end_peek( &server->pipe_end ); + return 0; + + default: + return default_fd_ioctl( fd, code, async_data, blocking ); + } +} + +static obj_handle_t pipe_client_ioctl( struct fd *fd, ioctl_code_t code, const async_data_t *async_data, + int blocking ) +{ + struct pipe_client *client = get_fd_user( fd ); + + switch(code) + { + case FSCTL_PIPE_PEEK: + pipe_end_peek( &client->pipe_end ); + return 0; + default: return default_fd_ioctl( fd, code, async_data, blocking ); } @@ -690,10 +1137,15 @@ static struct pipe_server *get_pipe_server_obj( struct process *process, return (struct pipe_server *) obj; } -static void init_pipe_end( struct pipe_end *pipe_end, unsigned int pipe_flags ) +static void init_pipe_end( struct pipe_end *pipe_end, unsigned int pipe_flags, data_size_t buffer_size ) { pipe_end->fd = NULL; pipe_end->flags = pipe_flags; + pipe_end->buffer_size = buffer_size; + pipe_end->connection = NULL; + pipe_end->read_q = NULL; + pipe_end->write_q = NULL; + list_init( &pipe_end->message_queue); } static struct pipe_server *create_pipe_server( struct named_pipe *pipe, unsigned int options, @@ -709,7 +1161,7 @@ static struct pipe_server *create_pipe_server( struct named_pipe *pipe, unsigned server->client = NULL; server->flush_poll = NULL; server->options = options; - init_pipe_end( &server->pipe_end, pipe_flags ); + init_pipe_end( &server->pipe_end, pipe_flags, pipe->insize ); list_add_head( &pipe->servers, &server->entry ); grab_object( pipe ); @@ -723,7 +1175,7 @@ static struct pipe_server *create_pipe_server( struct named_pipe *pipe, unsigned return server; } -static struct pipe_client *create_pipe_client( unsigned int flags, unsigned int pipe_flags ) +static struct pipe_client *create_pipe_client( unsigned int flags, unsigned int pipe_flags, data_size_t buffer_size ) { struct pipe_client *client; @@ -733,7 +1185,7 @@ static struct pipe_client *create_pipe_client( unsigned int flags, unsigned int client->server = NULL; client->flags = flags; - init_pipe_end( &client->pipe_end, pipe_flags ); + init_pipe_end( &client->pipe_end, pipe_flags, buffer_size ); return client; } @@ -797,9 +1249,24 @@ static struct object *named_pipe_open_file( struct object *obj, unsigned int acc return NULL; } - if ((client = create_pipe_client( options, pipe->flags ))) + if ((client = create_pipe_client( options, pipe->flags, pipe->outsize ))) { - if (!socketpair( PF_UNIX, SOCK_STREAM, 0, fds )) + if (use_server_io( &server->pipe_end )) + { + client->pipe_end.fd = alloc_pseudo_fd( &pipe_client_fd_ops, &client->pipe_end.obj, options ); + if (client->pipe_end.fd) + { + set_fd_signaled( client->pipe_end.fd, 1 ); + server->pipe_end.fd = (struct fd *)grab_object( server->ioctl_fd ); + set_no_fd_status( server->ioctl_fd, STATUS_BAD_DEVICE_TYPE ); + } + else + { + release_object( client ); + client = NULL; + } + } + else if (!socketpair( PF_UNIX, SOCK_STREAM, 0, fds )) { assert( !server->pipe_end.fd ); @@ -827,11 +1294,6 @@ static struct object *named_pipe_open_file( struct object *obj, unsigned int acc allow_fd_caching( client->pipe_end.fd ); allow_fd_caching( server->pipe_end.fd ); fd_copy_completion( server->ioctl_fd, server->pipe_end.fd ); - if (server->state == ps_wait_open) - fd_async_wake_up( server->ioctl_fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS ); - set_server_state( server, ps_connected_server ); - server->client = client; - client->server = server; } else { @@ -845,6 +1307,16 @@ static struct object *named_pipe_open_file( struct object *obj, unsigned int acc release_object( client ); client = NULL; } + if (client) + { + if (server->state == ps_wait_open) + fd_async_wake_up( server->ioctl_fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS ); + set_server_state( server, ps_connected_server ); + server->client = client; + client->server = server; + server->pipe_end.connection = &client->pipe_end; + client->pipe_end.connection = &server->pipe_end; + } } release_object( server ); return &client->pipe_end.obj;