From: Derek Lesho Subject: [PATCH v2 2/6] winegstreamer: Merge wg_parser_disconnect and wg_parser_destroy. Message-Id: <20210917195858.42475-2-dlesho@codeweavers.com> Date: Fri, 17 Sep 2021 15:58:54 -0400 In-Reply-To: <20210917195858.42475-1-dlesho@codeweavers.com> References: <20210917195858.42475-1-dlesho@codeweavers.com> Signed-off-by: Derek Lesho --- v2: - Removed sink_connected. - Fixed race condition with pad_removed_cb being called for the decodebin sink crashing due to stream_count no longer being set to 0. --- dlls/winegstreamer/gst_private.h | 1 - dlls/winegstreamer/media_source.c | 17 +-- dlls/winegstreamer/quartz_parser.c | 39 +++---- dlls/winegstreamer/wg_parser.c | 159 +++++++++++------------------ 4 files changed, 80 insertions(+), 136 deletions(-) diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index 49e06b31369..22d9547ed72 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -165,7 +165,6 @@ struct unix_funcs void (CDECL *wg_parser_destroy)(struct wg_parser *parser); HRESULT (CDECL *wg_parser_connect)(struct wg_parser *parser, uint64_t file_size); - void (CDECL *wg_parser_disconnect)(struct wg_parser *parser); void (CDECL *wg_parser_begin_flush)(struct wg_parser *parser); void (CDECL *wg_parser_end_flush)(struct wg_parser *parser); diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 825bad8da27..6c2bf92e2a2 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -103,7 +103,6 @@ struct media_source LONGLONG start_time; HANDLE read_thread; - bool read_thread_shutdown; }; static inline struct media_stream *impl_from_IMFMediaStream(IMFMediaStream *iface) @@ -538,7 +537,7 @@ static DWORD CALLBACK read_thread(void *arg) TRACE("Starting read thread for media source %p.\n", source); - while (!source->read_thread_shutdown) + for (;;) { uint64_t offset; ULONG ret_size; @@ -546,7 +545,7 @@ static DWORD CALLBACK read_thread(void *arg) HRESULT hr; if (!unix_funcs->wg_parser_get_next_read_offset(source->wg_parser, &offset, &size)) - continue; + break; if (offset >= file_size) size = 0; @@ -1234,9 +1233,8 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface) source->state = SOURCE_SHUTDOWN; - unix_funcs->wg_parser_disconnect(source->wg_parser); + unix_funcs->wg_parser_destroy(source->wg_parser); - source->read_thread_shutdown = true; WaitForSingleObject(source->read_thread, INFINITE); CloseHandle(source->read_thread); @@ -1257,8 +1255,6 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface) IMFMediaStream_Release(&stream->IMFMediaStream_iface); } - unix_funcs->wg_parser_destroy(source->wg_parser); - free(source->streams); MFUnlockWorkQueue(source->async_commands_queue); @@ -1426,16 +1422,13 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ free(stream); } free(object->streams); - if (stream_count != UINT_MAX) - unix_funcs->wg_parser_disconnect(object->wg_parser); + if (object->wg_parser) + unix_funcs->wg_parser_destroy(object->wg_parser); if (object->read_thread) { - object->read_thread_shutdown = true; WaitForSingleObject(object->read_thread, INFINITE); CloseHandle(object->read_thread); } - if (object->wg_parser) - unix_funcs->wg_parser_destroy(object->wg_parser); if (object->async_commands_queue) MFUnlockWorkQueue(object->async_commands_queue); if (object->event_queue) diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c index a8e7e3d979f..bf69a881d57 100644 --- a/dlls/winegstreamer/quartz_parser.c +++ b/dlls/winegstreamer/quartz_parser.c @@ -59,6 +59,7 @@ struct parser HANDLE read_thread; + struct wg_parser * (*parser_create)(void); BOOL (*init_gst)(struct parser *filter); HRESULT (*source_query_accept)(struct parser_source *pin, const AM_MEDIA_TYPE *mt); HRESULT (*source_get_media_type)(struct parser_source *pin, unsigned int index, AM_MEDIA_TYPE *mt); @@ -793,14 +794,14 @@ static DWORD CALLBACK read_thread(void *arg) TRACE("Starting read thread for filter %p.\n", filter); - while (filter->sink_connected) + for(;;) { uint64_t offset; uint32_t size; HRESULT hr; if (!unix_funcs->wg_parser_get_next_read_offset(filter->wg_parser, &offset, &size)) - continue; + break; if (offset >= file_size) size = 0; @@ -869,8 +870,6 @@ static void parser_destroy(struct strmbase_filter *iface) IAsyncReader_Release(filter->reader); filter->reader = NULL; - unix_funcs->wg_parser_destroy(filter->wg_parser); - strmbase_sink_cleanup(&filter->sink); strmbase_filter_cleanup(&filter->filter); free(filter); @@ -975,6 +974,12 @@ static HRESULT parser_sink_connect(struct strmbase_sink *iface, IPin *peer, cons IAsyncReader_Length(filter->reader, &file_size, &unused); + if (!(filter->wg_parser = filter->parser_create())) + { + hr = E_OUTOFMEMORY; + goto err; + } + filter->sink_connected = true; filter->read_thread = CreateThread(NULL, 0, read_thread, filter, 0, NULL); @@ -1115,11 +1120,7 @@ HRESULT decodebin_parser_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY; - if (!(object->wg_parser = unix_funcs->wg_decodebin_parser_create())) - { - free(object); - return E_OUTOFMEMORY; - } + object->parser_create = unix_funcs->wg_decodebin_parser_create; strmbase_filter_init(&object->filter, outer, &CLSID_decodebin_parser, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &sink_ops, NULL); @@ -1550,7 +1551,7 @@ static HRESULT GST_RemoveOutputPins(struct parser *This) if (!This->sink_connected) return S_OK; - unix_funcs->wg_parser_disconnect(This->wg_parser); + unix_funcs->wg_parser_destroy(This->wg_parser); /* read_thread() needs to stay alive to service any read requests GStreamer * sends, so we can only shut it down after GStreamer stops. */ @@ -1646,11 +1647,7 @@ HRESULT wave_parser_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY; - if (!(object->wg_parser = unix_funcs->wg_wave_parser_create())) - { - free(object); - return E_OUTOFMEMORY; - } + object->parser_create = unix_funcs->wg_wave_parser_create; strmbase_filter_init(&object->filter, outer, &CLSID_WAVEParser, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &wave_parser_sink_ops, NULL); @@ -1732,11 +1729,7 @@ HRESULT avi_splitter_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY; - if (!(object->wg_parser = unix_funcs->wg_avi_parser_create())) - { - free(object); - return E_OUTOFMEMORY; - } + object->parser_create = unix_funcs->wg_avi_parser_create; strmbase_filter_init(&object->filter, outer, &CLSID_AviSplitter, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &avi_splitter_sink_ops, NULL); @@ -1839,11 +1832,7 @@ HRESULT mpeg_splitter_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY; - if (!(object->wg_parser = unix_funcs->wg_mpeg_audio_parser_create())) - { - free(object); - return E_OUTOFMEMORY; - } + object->parser_create = unix_funcs->wg_mpeg_audio_parser_create; strmbase_filter_init(&object->filter, outer, &CLSID_MPEG1Splitter, &mpeg_splitter_ops); strmbase_sink_init(&object->sink, &object->filter, L"Input", &mpeg_splitter_sink_ops, NULL); diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 6b6b033b879..08823d52101 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -55,8 +55,8 @@ struct wg_parser pthread_mutex_t mutex; - pthread_cond_t init_cond; - bool no_more_pads, has_duration, error; + pthread_cond_t state_cond; + bool no_more_pads, has_duration, error, close_reader; pthread_cond_t read_cond; struct @@ -66,7 +66,7 @@ struct wg_parser bool pending; } read_request; - bool flushing, sink_connected; + bool flushing; }; struct wg_parser_stream @@ -515,11 +515,13 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, { pthread_mutex_lock(&parser->mutex); - while (parser->sink_connected && !parser->read_request.pending) + while (!parser->close_reader && !parser->read_request.pending) pthread_cond_wait(&parser->read_cond, &parser->mutex); - if (!parser->sink_connected) + if (parser->close_reader) { + parser->close_reader = false; + pthread_cond_signal(&parser->state_cond); pthread_mutex_unlock(&parser->mutex); return false; } @@ -543,13 +545,10 @@ static void CDECL wg_parser_push_data(struct wg_parser *parser, { pthread_mutex_lock(&parser->mutex); - if (parser->sink_connected) - { - error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->read_request.offset); - message = gst_message_new_error(NULL, error, ""); - gst_bus_post(parser->bus, message); - parser->read_request.pending = false; - } + error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->read_request.offset); + message = gst_message_new_error(NULL, error, ""); + gst_bus_post(parser->bus, message); + parser->read_request.pending = false; pthread_mutex_unlock(&parser->mutex); return; @@ -559,8 +558,7 @@ static void CDECL wg_parser_push_data(struct wg_parser *parser, { pthread_mutex_lock(&parser->mutex); - if (parser->sink_connected) - g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret); + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret); parser->read_request.pending = false; pthread_mutex_unlock(&parser->mutex); @@ -577,14 +575,6 @@ static void CDECL wg_parser_push_data(struct wg_parser *parser, gst_buffer_fill(buffer, 0, data, size); pthread_mutex_lock(&parser->mutex); - - if (!parser->sink_connected) - { - pthread_mutex_unlock(&parser->mutex); - gst_buffer_unref(buffer); - return; - } - assert(parser->read_request.pending); GST_BUFFER_OFFSET(buffer) = parser->read_request.offset; @@ -798,7 +788,7 @@ static void no_more_pads_cb(GstElement *element, gpointer user) pthread_mutex_lock(&parser->mutex); parser->no_more_pads = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); } static GstFlowReturn queue_stream_event(struct wg_parser_stream *stream, @@ -885,7 +875,7 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) pthread_mutex_lock(&parser->mutex); stream->eos = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); } break; @@ -936,7 +926,7 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) wg_format_from_caps(&stream->preferred_format, caps); stream->has_caps = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); break; } @@ -1228,6 +1218,9 @@ static void pad_removed_cb(GstElement *element, GstPad *pad, gpointer user) unsigned int i; char *name; + if (!GST_PAD_IS_SRC(pad)) + return; + GST_LOG("parser %p, element %p, pad %p.", parser, element, pad); for (i = 0; i < parser->stream_count; ++i) @@ -1322,7 +1315,7 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use pthread_mutex_lock(&parser->mutex); parser->error = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); break; case GST_MESSAGE_WARNING: @@ -1337,7 +1330,7 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use pthread_mutex_lock(&parser->mutex); parser->has_duration = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); break; default: @@ -1352,8 +1345,6 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s unsigned int i; int ret; - parser->sink_connected = true; - if (!parser->bus) { parser->bus = gst_bus_new(); @@ -1376,24 +1367,24 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s parser->error = false; if (!parser->init_gst(parser)) - goto out; + return E_FAIL; gst_element_set_state(parser->container, GST_STATE_PAUSED); ret = gst_element_get_state(parser->container, NULL, NULL, -1); if (ret == GST_STATE_CHANGE_FAILURE) { GST_ERROR("Failed to play stream.\n"); - goto out; + return E_FAIL; } pthread_mutex_lock(&parser->mutex); while (!parser->no_more_pads && !parser->error) - pthread_cond_wait(&parser->init_cond, &parser->mutex); + pthread_cond_wait(&parser->state_cond, &parser->mutex); if (parser->error) { pthread_mutex_unlock(&parser->mutex); - goto out; + return E_FAIL; } for (i = 0; i < parser->stream_count; ++i) @@ -1402,7 +1393,7 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s gint64 duration; while (!stream->has_caps && !parser->error) - pthread_cond_wait(&parser->init_cond, &parser->mutex); + pthread_cond_wait(&parser->state_cond, &parser->mutex); /* GStreamer doesn't actually provide any guarantees about when duration * is available, even for seekable streams. It's basically built for @@ -1433,7 +1424,7 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s if (parser->error) { pthread_mutex_unlock(&parser->mutex); - goto out; + return E_FAIL; } if (gst_pad_query_duration(stream->their_src, GST_FORMAT_TIME, &duration)) { @@ -1461,7 +1452,7 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s } else { - pthread_cond_wait(&parser->init_cond, &parser->mutex); + pthread_cond_wait(&parser->state_cond, &parser->mutex); } } } @@ -1469,62 +1460,6 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s pthread_mutex_unlock(&parser->mutex); return S_OK; - -out: - if (parser->container) - gst_element_set_state(parser->container, GST_STATE_NULL); - - for (i = 0; i < parser->stream_count; ++i) - free_stream(parser->streams[i]); - parser->stream_count = 0; - free(parser->streams); - parser->streams = NULL; - - if (parser->container) - { - gst_element_set_bus(parser->container, NULL); - gst_object_unref(parser->container); - parser->container = NULL; - } - - pthread_mutex_lock(&parser->mutex); - parser->sink_connected = false; - pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->read_cond); - - return E_FAIL; -} - -static void CDECL wg_parser_disconnect(struct wg_parser *parser) -{ - unsigned int i; - - /* Unblock all of our streams. */ - pthread_mutex_lock(&parser->mutex); - for (i = 0; i < parser->stream_count; ++i) - { - parser->streams[i]->flushing = true; - pthread_cond_signal(&parser->streams[i]->event_empty_cond); - } - pthread_mutex_unlock(&parser->mutex); - - gst_element_set_state(parser->container, GST_STATE_NULL); - - pthread_mutex_lock(&parser->mutex); - parser->sink_connected = false; - pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->read_cond); - - for (i = 0; i < parser->stream_count; ++i) - free_stream(parser->streams[i]); - - parser->stream_count = 0; - free(parser->streams); - parser->streams = NULL; - - gst_element_set_bus(parser->container, NULL); - gst_object_unref(parser->container); - parser->container = NULL; } static BOOL decodebin_parser_init_gst(struct wg_parser *parser) @@ -1656,7 +1591,7 @@ static struct wg_parser *wg_parser_create(void) return NULL; pthread_mutex_init(&parser->mutex, NULL); - pthread_cond_init(&parser->init_cond, NULL); + pthread_cond_init(&parser->state_cond, NULL); pthread_cond_init(&parser->read_cond, NULL); parser->flushing = true; @@ -1702,14 +1637,43 @@ static struct wg_parser * CDECL wg_wave_parser_create(void) static void CDECL wg_parser_destroy(struct wg_parser *parser) { - if (parser->bus) + unsigned int i; + + pthread_mutex_lock(&parser->mutex); + parser->close_reader = true; + pthread_cond_signal(&parser->read_cond); + while (parser->close_reader) + pthread_cond_wait(&parser->state_cond, &parser->mutex); + pthread_mutex_unlock(&parser->mutex); + + /* Unblock all of our streams. */ + pthread_mutex_lock(&parser->mutex); + for (i = 0; i < parser->stream_count; ++i) + { + parser->streams[i]->flushing = true; + pthread_cond_signal(&parser->streams[i]->event_empty_cond); + } + pthread_mutex_unlock(&parser->mutex); + + if (parser->container) + gst_element_set_state(parser->container, GST_STATE_NULL); + + for (i = 0; i < parser->stream_count; ++i) + free_stream(parser->streams[i]); + + free(parser->streams); + + if (parser->container) { - gst_bus_set_sync_handler(parser->bus, NULL, NULL, NULL); - gst_object_unref(parser->bus); + gst_element_set_bus(parser->container, NULL); + gst_object_unref(parser->container); } + gst_bus_set_sync_handler(parser->bus, NULL, NULL, NULL); + gst_object_unref(parser->bus); + pthread_mutex_destroy(&parser->mutex); - pthread_cond_destroy(&parser->init_cond); + pthread_cond_destroy(&parser->state_cond); pthread_cond_destroy(&parser->read_cond); free(parser); @@ -1724,7 +1688,6 @@ static const struct unix_funcs funcs = wg_parser_destroy, wg_parser_connect, - wg_parser_disconnect, wg_parser_begin_flush, wg_parser_end_flush, -- 2.33.0