From: Jacek Caban Subject: [PATCH 1/2] server: Added server side named pipe read and write implementation and use it for message mode pipes. Message-Id: Date: Wed, 1 Mar 2017 14:23:01 +0100 Fixed pipe_end_peek as spotted by Sebastian. Signed-off-by: Jacek Caban --- dlls/kernel32/tests/pipe.c | 28 ++- dlls/ntdll/file.c | 7 +- dlls/ntdll/tests/file.c | 2 +- server/async.c | 9 + server/file.h | 1 + server/named_pipe.c | 458 ++++++++++++++++++++++++++++++++++++++++++--- 6 files changed, 459 insertions(+), 46 deletions(-) diff --git a/dlls/kernel32/tests/pipe.c b/dlls/kernel32/tests/pipe.c index 45c6fd9..aafebd6 100644 --- a/dlls/kernel32/tests/pipe.c +++ b/dlls/kernel32/tests/pipe.c @@ -308,8 +308,7 @@ static void test_CreateNamedPipe(int pipemode) } else { - /* ok(readden == sizeof(obuf), "peek3 got %d bytes\n", readden); */ - if (readden != sizeof(obuf)) todo_wine ok(0, "peek3 got %d bytes\n", readden); + ok(readden == sizeof(obuf), "peek3 got %d bytes\n", readden); } ok(avail == sizeof(obuf) + sizeof(obuf2), "peek3 got %d bytes available\n", avail); pbuf = ibuf; @@ -339,8 +338,7 @@ static void test_CreateNamedPipe(int pipemode) } else { - /* ok(readden == sizeof(obuf), "peek4 got %d bytes\n", readden); */ - if (readden != sizeof(obuf)) todo_wine ok(0, "peek4 got %d bytes\n", readden); + ok(readden == sizeof(obuf), "peek4 got %d bytes\n", readden); } ok(avail == sizeof(obuf) + sizeof(obuf2), "peek4 got %d bytes available\n", avail); pbuf = ibuf; @@ -381,9 +379,7 @@ static void test_CreateNamedPipe(int pipemode) ok(WriteFile(hnp, obuf2, sizeof(obuf2), &written, NULL), " WriteFile5b\n"); ok(written == sizeof(obuf2), "write file len 3b\n"); ok(PeekNamedPipe(hFile, ibuf, sizeof(ibuf), &readden, &avail, NULL), "Peek5\n"); - /* currently the Wine behavior depends on the kernel version */ - /* ok(readden == sizeof(obuf), "peek5 got %d bytes\n", readden); */ - if (readden != sizeof(obuf)) todo_wine ok(0, "peek5 got %d bytes\n", readden); + ok(readden == sizeof(obuf), "peek5 got %d bytes\n", readden); ok(avail == sizeof(obuf) + sizeof(obuf2), "peek5 got %d bytes available\n", avail); pbuf = ibuf; @@ -416,9 +412,7 @@ static void test_CreateNamedPipe(int pipemode) ok(WriteFile(hFile, obuf2, sizeof(obuf2), &written, NULL), " WriteFile6b\n"); ok(written == sizeof(obuf2), "write file len 6b\n"); ok(PeekNamedPipe(hnp, ibuf, sizeof(ibuf), &readden, &avail, NULL), "Peek6\n"); - /* currently the Wine behavior depends on the kernel version */ - /* ok(readden == sizeof(obuf), "peek6 got %d bytes\n", readden); */ - if (readden != sizeof(obuf)) todo_wine ok(0, "peek6 got %d bytes\n", readden); + ok(readden == sizeof(obuf), "peek6 got %d bytes\n", readden); ok(avail == sizeof(obuf) + sizeof(obuf2), "peek6b got %d bytes available\n", avail); pbuf = ibuf; @@ -496,7 +490,6 @@ static void test_CreateNamedPipe(int pipemode) ok(GetLastError() == ERROR_MORE_DATA, "wrong error 9\n"); ok(readden == 4, "read got %d bytes 9\n", readden); SetLastError(0xdeadbeef); - todo_wine ok(!ReadFile(hFile, ibuf + 4, 4, &readden, NULL), "ReadFile 9\n"); todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 9\n"); @@ -536,6 +529,7 @@ static void test_CreateNamedPipe(int pipemode) memset(ibuf, 0, sizeof(ibuf)); SetLastError(0xdeadbeef); ret = RpcReadFile(hnp, ibuf, 4, &readden, NULL); + todo_wine ok(!ret, "RpcReadFile 10\n"); todo_wine ok(GetLastError() == ERROR_MORE_DATA, "wrong error 10\n"); @@ -1424,7 +1418,7 @@ static void test_CloseHandle(void) numbytes = 0xdeadbeef; memset(buffer, 0, sizeof(buffer)); ret = ReadFile(hfile, buffer, 0, &numbytes, NULL); - todo_wine ok(ret, "ReadFile failed with %u\n", GetLastError()); + ok(ret, "ReadFile failed with %u\n", GetLastError()); ok(numbytes == 0, "expected 0, got %u\n", numbytes); numbytes = 0xdeadbeef; @@ -1470,13 +1464,13 @@ static void test_CloseHandle(void) numbytes = 0xdeadbeef; memset(buffer, 0, sizeof(buffer)); ret = ReadFile(hfile, buffer, sizeof(buffer), &numbytes, NULL); - todo_wine ok(ret, "ReadFile failed with %u\n", GetLastError()); + ok(ret, "ReadFile failed with %u\n", GetLastError()); ok(numbytes == 0, "expected 0, got %u\n", numbytes); SetLastError(0xdeadbeef); ret = ReadFile(hfile, buffer, 0, &numbytes, NULL); ok(!ret, "ReadFile unexpectedly succeeded\n"); - todo_wine ok(GetLastError() == ERROR_BROKEN_PIPE, "expected ERROR_BROKEN_PIPE, got %u\n", GetLastError()); + ok(GetLastError() == ERROR_BROKEN_PIPE, "expected ERROR_BROKEN_PIPE, got %u\n", GetLastError()); ret = GetNamedPipeHandleStateA(hfile, &state, NULL, NULL, NULL, NULL, 0); ok(ret, "GetNamedPipeHandleState failed with %u\n", GetLastError()); @@ -1487,7 +1481,7 @@ static void test_CloseHandle(void) SetLastError(0xdeadbeef); ret = ReadFile(hfile, buffer, 0, &numbytes, NULL); ok(!ret, "ReadFile unexpectedly succeeded\n"); - todo_wine ok(GetLastError() == ERROR_BROKEN_PIPE, "expected ERROR_BROKEN_PIPE, got %u\n", GetLastError()); + ok(GetLastError() == ERROR_BROKEN_PIPE, "expected ERROR_BROKEN_PIPE, got %u\n", GetLastError()); SetLastError(0xdeadbeef); ret = WriteFile(hfile, testdata, sizeof(testdata), &numbytes, NULL); @@ -1522,7 +1516,7 @@ static void test_CloseHandle(void) numbytes = 0xdeadbeef; memset(buffer, 0, sizeof(buffer)); ret = ReadFile(hpipe, buffer, 0, &numbytes, NULL); - todo_wine ok(ret || GetLastError() == ERROR_MORE_DATA /* >= Win 8 */, + ok(ret || GetLastError() == ERROR_MORE_DATA /* >= Win 8 */, "ReadFile failed with %u\n", GetLastError()); ok(numbytes == 0, "expected 0, got %u\n", numbytes); @@ -1569,7 +1563,7 @@ static void test_CloseHandle(void) numbytes = 0xdeadbeef; memset(buffer, 0, sizeof(buffer)); ret = ReadFile(hpipe, buffer, sizeof(buffer), &numbytes, NULL); - todo_wine ok(ret, "ReadFile failed with %u\n", GetLastError()); + ok(ret, "ReadFile failed with %u\n", GetLastError()); ok(numbytes == 0, "expected 0, got %u\n", numbytes); SetLastError(0xdeadbeef); diff --git a/dlls/ntdll/file.c b/dlls/ntdll/file.c index fd7f3dd..bf95c65 100644 --- a/dlls/ntdll/file.c +++ b/dlls/ntdll/file.c @@ -1726,8 +1726,11 @@ NTSTATUS WINAPI NtFsControlFile(HANDLE handle, HANDLE event, PIO_APC_ROUTINE apc break; } - if ((status = server_get_unix_fd( handle, FILE_READ_DATA, &fd, &needs_close, NULL, NULL ))) - break; + status = server_get_unix_fd( handle, FILE_READ_DATA, &fd, &needs_close, NULL, NULL ); + if (status == STATUS_BAD_DEVICE_TYPE) + return server_ioctl_file( handle, event, apc, apc_context, io, code, + in_buffer, in_size, out_buffer, out_size ); + if (status) break; #ifdef FIONREAD if (ioctl( fd, FIONREAD, &avail ) != 0) diff --git a/dlls/ntdll/tests/file.c b/dlls/ntdll/tests/file.c index 18c3f63..8564595 100644 --- a/dlls/ntdll/tests/file.c +++ b/dlls/ntdll/tests/file.c @@ -1271,7 +1271,7 @@ static void test_iocp_fileio(HANDLE h) ok( completionKey == CKEY_SECOND, "Invalid completion key: %lx\n", completionKey ); ok( ioSb.Information == 0, "Invalid ioSb.Information: %ld\n", ioSb.Information ); /* wine sends wrong status here */ - todo_wine ok( U(ioSb).Status == STATUS_PIPE_BROKEN, "Invalid ioSb.Status: %x\n", U(ioSb).Status); + ok( U(ioSb).Status == STATUS_PIPE_BROKEN, "Invalid ioSb.Status: %x\n", U(ioSb).Status); ok( completionValue == (ULONG_PTR)&o, "Invalid completion value: %lx\n", completionValue ); } } diff --git a/server/async.c b/server/async.c index 4e30c2e..e113681 100644 --- a/server/async.c +++ b/server/async.c @@ -474,6 +474,15 @@ struct iosb *async_get_iosb( struct async *async ) return async->iosb ? (struct iosb *)grab_object( async->iosb ) : NULL; } +/* find the first pending async in queue */ +struct async *find_pending_async( struct async_queue *queue ) +{ + struct async *async; + if (queue) LIST_FOR_EACH_ENTRY( async, &queue->queue, struct async, queue_entry ) + if (async->status == STATUS_PENDING) return (struct async *)grab_object( async ); + return NULL; +} + /* cancels all async I/O */ DECL_HANDLER(cancel_async) { diff --git a/server/file.h b/server/file.h index 8e906d3..398733c 100644 --- a/server/file.h +++ b/server/file.h @@ -188,6 +188,7 @@ extern struct completion *fd_get_completion( struct fd *fd, apc_param_t *p_key ) extern void fd_copy_completion( struct fd *src, struct fd *dst ); extern struct iosb *create_iosb( const void *in_data, data_size_t in_size, data_size_t out_size ); extern struct iosb *async_get_iosb( struct async *async ); +extern struct async *find_pending_async( struct async_queue *queue ); extern void cancel_process_asyncs( struct process *process ); /* access rights that require Unix read permission */ diff --git a/server/named_pipe.c b/server/named_pipe.c index 6861e0f..0408077 100644 --- a/server/named_pipe.c +++ b/server/named_pipe.c @@ -3,6 +3,7 @@ * * Copyright (C) 1998 Alexandre Julliard * Copyright (C) 2001 Mike McCormack + * Copyright 2016 Jacek Caban for CodeWeavers * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -65,11 +66,24 @@ enum pipe_state struct named_pipe; +struct pipe_message +{ + struct list entry; /* entry in message queue */ + size_t read_pos; /* already read bytes */ + struct iosb *iosb; /* message iosb */ + struct async *async; /* async of pending write */ +}; + struct pipe_end { struct object obj; /* object header */ struct fd *fd; /* pipe file descriptor */ unsigned int flags; /* pipe flags */ + struct pipe_end *connection; /* the other end of the pipe */ + struct list message_queue; + struct async_queue *read_q; /* read queue */ + struct async_queue *write_q; /* write queue */ + data_size_t buffer_size;/* size of buffered data that doesn't block caller */ }; struct pipe_server @@ -141,6 +155,12 @@ static const struct object_ops named_pipe_ops = named_pipe_destroy /* destroy */ }; +/* common server and client pipe end functions */ +static obj_handle_t pipe_end_read( struct fd *fd, struct async *async, int blocking, file_pos_t pos ); +static obj_handle_t pipe_end_write( struct fd *fd, struct async *async_data, int blocking, file_pos_t pos ); +static void pipe_end_queue_async( struct fd *fd, struct async *async, int type, int count ); +static void pipe_end_reselect_async( struct fd *fd, struct async_queue *queue ); + /* server end functions */ static void pipe_server_dump( struct object *obj, int verbose ); static struct fd *pipe_server_get_fd( struct object *obj ); @@ -177,12 +197,12 @@ static const struct fd_ops pipe_server_fd_ops = default_fd_get_poll_events, /* get_poll_events */ default_poll_event, /* poll_event */ pipe_server_get_fd_type, /* get_fd_type */ - no_fd_read, /* read */ - no_fd_write, /* write */ + pipe_end_read, /* read */ + pipe_end_write, /* write */ pipe_server_flush, /* flush */ pipe_server_ioctl, /* ioctl */ - default_fd_queue_async, /* queue_async */ - default_fd_reselect_async /* reselect_async */ + pipe_end_queue_async, /* queue_async */ + pipe_end_reselect_async /* reselect_async */ }; /* client end functions */ @@ -191,6 +211,8 @@ static int pipe_client_signaled( struct object *obj, struct wait_queue_entry *en static struct fd *pipe_client_get_fd( struct object *obj ); static void pipe_client_destroy( struct object *obj ); static obj_handle_t pipe_client_flush( struct fd *fd, struct async *async, int blocking ); +static obj_handle_t pipe_client_ioctl( struct fd *fd, ioctl_code_t code, struct async *async, + int blocking ); static enum server_fd_type pipe_client_get_fd_type( struct fd *fd ); static const struct object_ops pipe_client_ops = @@ -220,12 +242,12 @@ static const struct fd_ops pipe_client_fd_ops = default_fd_get_poll_events, /* get_poll_events */ default_poll_event, /* poll_event */ pipe_client_get_fd_type, /* get_fd_type */ - no_fd_read, /* read */ - no_fd_write, /* write */ + pipe_end_read, /* read */ + pipe_end_write, /* write */ pipe_client_flush, /* flush */ - default_fd_ioctl, /* ioctl */ - default_fd_queue_async, /* queue_async */ - default_fd_reselect_async /* reselect_async */ + pipe_client_ioctl, /* ioctl */ + pipe_end_queue_async, /* queue_async */ + pipe_end_reselect_async /* reselect_async */ }; static void named_pipe_device_dump( struct object *obj, int verbose ); @@ -275,6 +297,12 @@ static const struct fd_ops named_pipe_device_fd_ops = default_fd_reselect_async /* reselect_async */ }; +/* Returns if we handle I/O via server calls. Currently message-mode pipes are handled this way. */ +static int use_server_io( struct pipe_end *pipe_end ) +{ + return pipe_end->flags & NAMED_PIPE_MESSAGE_STREAM_WRITE; +} + static void named_pipe_dump( struct object *obj, int verbose ) { fputs( "Named pipe\n", stderr ); @@ -368,6 +396,56 @@ static void notify_empty( struct pipe_server *server ) fd_async_wake_up( server->pipe_end.fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS ); } +static void wake_message( struct pipe_message *message ) +{ + struct async *async = message->async; + + message->async = NULL; + message->iosb->status = STATUS_SUCCESS; + message->iosb->result = message->iosb->in_size; + if (async) + { + async_terminate( async, message->iosb->result ? STATUS_ALERTED : STATUS_SUCCESS ); + release_object( async ); + } +} + +static void free_message( struct pipe_message *message ) +{ + list_remove( &message->entry ); + if (message->iosb) release_object( message->iosb ); + free( message ); +} + +static void pipe_end_disconnect( struct pipe_end *pipe_end, NTSTATUS status ) +{ + struct pipe_end *connection = pipe_end->connection; + + pipe_end->connection = NULL; + + if (use_server_io( pipe_end )) + { + struct pipe_message *message, *next; + struct async *async; + if (pipe_end->fd) fd_async_wake_up( pipe_end->fd, ASYNC_TYPE_WAIT, status ); + async_wake_up( pipe_end->read_q, status ); + LIST_FOR_EACH_ENTRY_SAFE( message, next, &pipe_end->message_queue, struct pipe_message, entry ) + { + async = message->async; + if (async || status == STATUS_PIPE_DISCONNECTED) free_message( message ); + if (!async) continue; + async_terminate( async, status ); + release_object( async ); + } + if (status == STATUS_PIPE_DISCONNECTED) set_fd_signaled( pipe_end->fd, 0 ); + } + if (connection) + { + connection->connection = NULL; + pipe_end_disconnect( connection, status ); + } +} + static void do_disconnect( struct pipe_server *server ) { /* we may only have a server fd, if the client disconnected */ @@ -375,27 +453,49 @@ static void do_disconnect( struct pipe_server *server ) { assert( server->client->server == server ); assert( server->client->pipe_end.fd ); - release_object( server->client->pipe_end.fd ); - server->client->pipe_end.fd = NULL; + if (!use_server_io( &server->pipe_end )) + { + release_object( server->client->pipe_end.fd ); + server->client->pipe_end.fd = NULL; + } } assert( server->pipe_end.fd ); - shutdown( get_unix_fd( server->pipe_end.fd ), SHUT_RDWR ); + if (!use_server_io( &server->pipe_end )) + shutdown( get_unix_fd( server->pipe_end.fd ), SHUT_RDWR ); release_object( server->pipe_end.fd ); server->pipe_end.fd = NULL; } +static void pipe_end_destroy( struct pipe_end *pipe_end ) +{ + struct pipe_message *message; + + while (!list_empty( &pipe_end->message_queue )) + { + message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry ); + assert( !message->async ); + free_message( message ); + } + + free_async_queue( pipe_end->read_q ); + free_async_queue( pipe_end->write_q ); +} + static void pipe_server_destroy( struct object *obj) { struct pipe_server *server = (struct pipe_server *)obj; assert( obj->ops == &pipe_server_ops ); + pipe_end_disconnect( &server->pipe_end, STATUS_PIPE_BROKEN ); + if (server->pipe_end.fd) { notify_empty( server ); do_disconnect( server ); } + pipe_end_destroy( &server->pipe_end ); if (server->client) { server->client->server = NULL; @@ -417,6 +517,8 @@ static void pipe_client_destroy( struct object *obj) assert( obj->ops == &pipe_client_ops ); + pipe_end_disconnect( &client->pipe_end, STATUS_PIPE_BROKEN ); + if (server) { notify_empty( server ); @@ -438,6 +540,8 @@ static void pipe_client_destroy( struct object *obj) server->client = NULL; client->server = NULL; } + + pipe_end_destroy( &client->pipe_end ); if (client->pipe_end.fd) release_object( client->pipe_end.fd ); } @@ -520,6 +624,9 @@ static int pipe_data_remaining( struct pipe_server *server ) assert( server->client ); + if (use_server_io( &server->pipe_end )) + return !list_empty( &server->client->pipe_end.message_queue ); + fd = get_unix_fd( server->client->pipe_end.fd ); if (fd < 0) return 0; @@ -571,7 +678,7 @@ static obj_handle_t pipe_server_flush( struct fd *fd, struct async *async, int b handle = pipe_end_flush( &server->pipe_end, async, blocking ); /* there's no unix way to be alerted when a pipe becomes empty, so resort to polling */ - if (handle && !server->flush_poll) + if (handle && !use_server_io( &server->pipe_end ) && !server->flush_poll) server->flush_poll = add_timeout_user( -TICKS_PER_SEC / 10, check_flushed, server ); return handle; } @@ -582,6 +689,220 @@ static obj_handle_t pipe_client_flush( struct fd *fd, struct async *async, int b return 0; } +static void message_queue_read( struct pipe_end *pipe_end, struct iosb *iosb ) +{ + struct pipe_message *message; + data_size_t avail = 0; + + LIST_FOR_EACH_ENTRY( message, &pipe_end->message_queue, struct pipe_message, entry ) + { + avail += message->iosb->in_size - message->read_pos; + if (avail >= iosb->out_size) break; + } + iosb->out_size = min( iosb->out_size, avail ); + iosb->status = STATUS_SUCCESS; + + message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry ); + if (!message->read_pos && message->iosb->in_size == iosb->out_size) /* fast path */ + { + iosb->out_data = message->iosb->in_data; + message->iosb->in_data = NULL; + wake_message( message ); + free_message( message ); + } + else + { + data_size_t write_pos = 0, writing; + char *buf = NULL; + + if (iosb->out_size && !(buf = iosb->out_data = malloc( iosb->out_size ))) + { + iosb->out_size = 0; + iosb->status = STATUS_NO_MEMORY; + return; + } + + do + { + message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry ); + writing = min( iosb->out_size - write_pos, message->iosb->in_size - message->read_pos ); + if (writing) memcpy( buf + write_pos, (const char *)message->iosb->in_data + message->read_pos, writing ); + write_pos += writing; + message->read_pos += writing; + if (message->read_pos == message->iosb->in_size) + { + wake_message(message); + free_message(message); + } + } while (write_pos < iosb->out_size); + } + iosb->result = iosb->out_size; +} + +/* We call async_terminate in our reselect implementation, which causes recursive reselect. + * We're not interested in such reselect calls, so we ignore them. */ +static int ignore_reselect; + +static void reselect_write_queue( struct pipe_end *pipe_end ); + +static void reselect_read_queue( struct pipe_end *pipe_end ) +{ + struct async *async; + struct iosb *iosb; + int read_done = 0; + + ignore_reselect = 1; + while (!list_empty( &pipe_end->message_queue) && (async = find_pending_async( pipe_end->read_q ))) + { + iosb = async_get_iosb( async ); + message_queue_read( pipe_end, iosb ); + async_terminate( async, iosb->result ? STATUS_ALERTED : iosb->status ); + release_object( async ); + release_object( iosb ); + read_done = 1; + } + ignore_reselect = 0; + + if (pipe_end->connection) + { + if (list_empty( &pipe_end->message_queue )) + fd_async_wake_up( pipe_end->connection->fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS ); + else if (read_done) + reselect_write_queue( pipe_end->connection ); + } +} + +static void reselect_write_queue( struct pipe_end *pipe_end ) +{ + struct pipe_message *message, *next; + struct pipe_end *reader = pipe_end->connection; + data_size_t avail = 0; + + if (!reader) return; + + ignore_reselect = 1; + + LIST_FOR_EACH_ENTRY_SAFE( message, next, &reader->message_queue, struct pipe_message, entry ) + { + if (message->async && message->iosb->status != STATUS_PENDING) + { + release_object( message->async ); + message->async = NULL; + free_message( message ); + } + else + { + avail += message->iosb->in_size - message->read_pos; + if (message->iosb->status == STATUS_PENDING && (avail <= reader->buffer_size || !message->iosb->in_size)) + wake_message( message ); + } + } + + ignore_reselect = 0; + reselect_read_queue( reader ); +} + +static obj_handle_t pipe_end_read( struct fd *fd, struct async *async, int blocking, file_pos_t pos ) +{ + struct pipe_end *pipe_end = get_fd_user( fd ); + obj_handle_t handle = 0; + + if (!use_server_io( pipe_end )) return no_fd_read( fd, async, blocking, pos ); + + if (!pipe_end->connection && list_empty( &pipe_end->message_queue )) + { + set_error( STATUS_PIPE_BROKEN ); + return 0; + } + + if (!pipe_end->read_q && !(pipe_end->read_q = create_async_queue( fd ))) return 0; + if (!(handle = alloc_handle( current->process, async, SYNCHRONIZE, 0 ))) return 0; + + queue_async( pipe_end->read_q, async ); + reselect_read_queue( pipe_end ); + set_error( STATUS_PENDING ); + + if (!blocking) + { + struct iosb *iosb; + iosb = async_get_iosb( async ); + if (iosb->status == STATUS_PENDING) + { + close_handle( current->process, handle ); + handle = 0; + } + release_object( iosb ); + } + return handle; +} + +static obj_handle_t pipe_end_write( struct fd *fd, struct async *async, int blocking, file_pos_t pos ) +{ + struct pipe_end *write_end = get_fd_user( fd ); + struct pipe_end *read_end = write_end->connection; + struct pipe_message *message; + obj_handle_t handle = 0; + + if (!use_server_io( write_end )) return no_fd_write( fd, async, blocking, pos ); + + if (!read_end) + { + set_error( STATUS_PIPE_DISCONNECTED ); + return 0; + } + + if (!write_end->write_q && !(write_end->write_q = create_async_queue( fd ))) return 0; + if (!(handle = alloc_handle( current->process, async, SYNCHRONIZE, 0 ))) return 0; + + if (!(message = mem_alloc(sizeof(*message)))) + { + close_handle( current->process, handle ); + return 0; + } + message->async = (struct async *)grab_object( async ); + message->iosb = async_get_iosb( async ); + message->read_pos = 0; + list_add_tail( &read_end->message_queue, &message->entry ); + + queue_async( write_end->write_q, async ); + reselect_write_queue( write_end ); + set_error( STATUS_PENDING ); + + if (!blocking) + { + struct iosb *iosb; + iosb = async_get_iosb( async ); + if (iosb->status == STATUS_PENDING) + { + close_handle( current->process, handle ); + handle = 0; + } + release_object( iosb ); + } + return handle; +} + +static void pipe_end_queue_async( struct fd *fd, struct async *async, int type, int count ) +{ + struct pipe_end *pipe_end = get_fd_user( fd ); + if (use_server_io( pipe_end )) no_fd_queue_async( fd, async, type, count ); + else default_fd_queue_async( fd, async, type, count ); +} + +static void pipe_end_reselect_async( struct fd *fd, struct async_queue *queue ) +{ + struct pipe_end *pipe_end = get_fd_user( fd ); + + if (ignore_reselect) return; + + if (!use_server_io( pipe_end )) + default_fd_reselect_async( fd, queue ); + else if (pipe_end->write_q && pipe_end->write_q == queue) + reselect_write_queue( pipe_end ); + else if (pipe_end->read_q && pipe_end->read_q == queue) + reselect_read_queue( pipe_end ); +} + static inline int is_overlapped( unsigned int options ) { return !(options & (FILE_SYNCHRONOUS_IO_ALERT | FILE_SYNCHRONOUS_IO_NONALERT)); @@ -597,6 +918,44 @@ static enum server_fd_type pipe_client_get_fd_type( struct fd *fd ) return FD_TYPE_PIPE; } +static void pipe_end_peek( struct pipe_end *pipe_end ) +{ + unsigned reply_size = get_reply_max_size(); + FILE_PIPE_PEEK_BUFFER *buffer; + struct pipe_message *message; + data_size_t avail = 0; + + if (!use_server_io( pipe_end )) + { + set_error( STATUS_NOT_SUPPORTED ); + return; + } + + if (reply_size < FIELD_OFFSET( FILE_PIPE_PEEK_BUFFER, Data )) + { + set_error( STATUS_INFO_LENGTH_MISMATCH ); + return; + } + reply_size -= FIELD_OFFSET( FILE_PIPE_PEEK_BUFFER, Data ); + + LIST_FOR_EACH_ENTRY( message, &pipe_end->message_queue, struct pipe_message, entry ) + avail += message->iosb->in_size - message->read_pos; + + if (avail) + { + message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry ); + reply_size = min( reply_size, message->iosb->in_size - message->read_pos ); + } + else reply_size = 0; + + if (!(buffer = set_reply_data_size(FIELD_OFFSET( FILE_PIPE_PEEK_BUFFER, Data[reply_size] )))) return; + buffer->NamedPipeState = 0; /* FIXME */ + buffer->ReadDataAvailable = avail; + buffer->NumberOfMessages = 0; /* FIXME */ + buffer->MessageLength = 0; /* FIXME */ + if (reply_size) memcpy( buffer->Data, (const char *)message->iosb->in_data + message->read_pos, reply_size ); +} + static obj_handle_t pipe_server_ioctl( struct fd *fd, ioctl_code_t code, struct async *async, int blocking ) { @@ -641,6 +1000,7 @@ static obj_handle_t pipe_server_ioctl( struct fd *fd, ioctl_code_t code, struct notify_empty( server ); /* dump the client and server fds - client loses all waiting data */ + pipe_end_disconnect( &server->pipe_end, STATUS_PIPE_DISCONNECTED ); do_disconnect( server ); server->client->server = NULL; server->client = NULL; @@ -648,6 +1008,7 @@ static obj_handle_t pipe_server_ioctl( struct fd *fd, ioctl_code_t code, struct break; case ps_wait_disconnect: assert( !server->client ); + pipe_end_disconnect( &server->pipe_end, STATUS_PIPE_DISCONNECTED ); do_disconnect( server ); set_server_state( server, ps_wait_connect ); break; @@ -661,6 +1022,26 @@ static obj_handle_t pipe_server_ioctl( struct fd *fd, ioctl_code_t code, struct } return 0; + case FSCTL_PIPE_PEEK: + pipe_end_peek( &server->pipe_end ); + return 0; + + default: + return default_fd_ioctl( fd, code, async, blocking ); + } +} + +static obj_handle_t pipe_client_ioctl( struct fd *fd, ioctl_code_t code, struct async *async, + int blocking ) +{ + struct pipe_client *client = get_fd_user( fd ); + + switch(code) + { + case FSCTL_PIPE_PEEK: + pipe_end_peek( &client->pipe_end ); + return 0; + default: return default_fd_ioctl( fd, code, async, blocking ); } @@ -674,10 +1055,15 @@ static struct pipe_server *get_pipe_server_obj( struct process *process, return (struct pipe_server *) obj; } -static void init_pipe_end( struct pipe_end *pipe_end, unsigned int pipe_flags ) +static void init_pipe_end( struct pipe_end *pipe_end, unsigned int pipe_flags, data_size_t buffer_size ) { pipe_end->fd = NULL; pipe_end->flags = pipe_flags; + pipe_end->connection = NULL; + pipe_end->buffer_size = buffer_size; + pipe_end->read_q = NULL; + pipe_end->write_q = NULL; + list_init( &pipe_end->message_queue); } static struct pipe_server *create_pipe_server( struct named_pipe *pipe, unsigned int options, @@ -693,7 +1079,7 @@ static struct pipe_server *create_pipe_server( struct named_pipe *pipe, unsigned server->client = NULL; server->flush_poll = NULL; server->options = options; - init_pipe_end( &server->pipe_end, pipe_flags ); + init_pipe_end( &server->pipe_end, pipe_flags, pipe->insize ); list_add_head( &pipe->servers, &server->entry ); grab_object( pipe ); @@ -707,7 +1093,7 @@ static struct pipe_server *create_pipe_server( struct named_pipe *pipe, unsigned return server; } -static struct pipe_client *create_pipe_client( unsigned int flags, unsigned int pipe_flags ) +static struct pipe_client *create_pipe_client( unsigned int flags, unsigned int pipe_flags, data_size_t buffer_size ) { struct pipe_client *client; @@ -717,7 +1103,7 @@ static struct pipe_client *create_pipe_client( unsigned int flags, unsigned int client->server = NULL; client->flags = flags; - init_pipe_end( &client->pipe_end, pipe_flags ); + init_pipe_end( &client->pipe_end, pipe_flags, buffer_size ); return client; } @@ -781,9 +1167,24 @@ static struct object *named_pipe_open_file( struct object *obj, unsigned int acc return NULL; } - if ((client = create_pipe_client( options, pipe->flags ))) + if ((client = create_pipe_client( options, pipe->flags, pipe->outsize ))) { - if (!socketpair( PF_UNIX, SOCK_STREAM, 0, fds )) + if (use_server_io( &server->pipe_end )) + { + client->pipe_end.fd = alloc_pseudo_fd( &pipe_client_fd_ops, &client->pipe_end.obj, options ); + if (client->pipe_end.fd) + { + set_fd_signaled( client->pipe_end.fd, 1 ); + server->pipe_end.fd = (struct fd *)grab_object( server->ioctl_fd ); + set_no_fd_status( server->ioctl_fd, STATUS_BAD_DEVICE_TYPE ); + } + else + { + release_object( client ); + client = NULL; + } + } + else if (!socketpair( PF_UNIX, SOCK_STREAM, 0, fds )) { assert( !server->pipe_end.fd ); @@ -808,14 +1209,7 @@ static struct object *named_pipe_open_file( struct object *obj, unsigned int acc server->pipe_end.fd = create_anonymous_fd( &pipe_server_fd_ops, fds[0], &server->pipe_end.obj, server->options ); if (client->pipe_end.fd && server->pipe_end.fd) { - allow_fd_caching( client->pipe_end.fd ); - allow_fd_caching( server->pipe_end.fd ); fd_copy_completion( server->ioctl_fd, server->pipe_end.fd ); - if (server->state == ps_wait_open) - fd_async_wake_up( server->ioctl_fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS ); - set_server_state( server, ps_connected_server ); - server->client = client; - client->server = server; } else { @@ -829,6 +1223,18 @@ static struct object *named_pipe_open_file( struct object *obj, unsigned int acc release_object( client ); client = NULL; } + if (client) + { + allow_fd_caching( client->pipe_end.fd ); + allow_fd_caching( server->pipe_end.fd ); + if (server->state == ps_wait_open) + fd_async_wake_up( server->ioctl_fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS ); + set_server_state( server, ps_connected_server ); + server->client = client; + client->server = server; + server->pipe_end.connection = &client->pipe_end; + client->pipe_end.connection = &server->pipe_end; + } } release_object( server ); return &client->pipe_end.obj;