From: Derek Lesho Subject: [PATCH 2/3] winegstreamer: Merge wg_parser_disconnect and wg_parser_destroy. Message-Id: <20210916210047.1845028-2-dlesho@codeweavers.com> Date: Thu, 16 Sep 2021 17:00:46 -0400 In-Reply-To: <20210916210047.1845028-1-dlesho@codeweavers.com> References: <20210916210047.1845028-1-dlesho@codeweavers.com> Signed-off-by: Derek Lesho --- dlls/winegstreamer/gst_private.h | 1 - dlls/winegstreamer/media_source.c | 17 ++--- dlls/winegstreamer/quartz_parser.c | 39 ++++------ dlls/winegstreamer/wg_parser.c | 119 +++++++++++++---------------- 4 files changed, 70 insertions(+), 106 deletions(-) diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index 49e06b31369..22d9547ed72 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -165,7 +165,6 @@ struct unix_funcs void (CDECL *wg_parser_destroy)(struct wg_parser *parser); HRESULT (CDECL *wg_parser_connect)(struct wg_parser *parser, uint64_t file_size); - void (CDECL *wg_parser_disconnect)(struct wg_parser *parser); void (CDECL *wg_parser_begin_flush)(struct wg_parser *parser); void (CDECL *wg_parser_end_flush)(struct wg_parser *parser); diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 825bad8da27..6c2bf92e2a2 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -103,7 +103,6 @@ struct media_source LONGLONG start_time; HANDLE read_thread; - bool read_thread_shutdown; }; static inline struct media_stream *impl_from_IMFMediaStream(IMFMediaStream *iface) @@ -538,7 +537,7 @@ static DWORD CALLBACK read_thread(void *arg) TRACE("Starting read thread for media source %p.\n", source); - while (!source->read_thread_shutdown) + for (;;) { uint64_t offset; ULONG ret_size; @@ -546,7 +545,7 @@ static DWORD CALLBACK read_thread(void *arg) HRESULT hr; if (!unix_funcs->wg_parser_get_next_read_offset(source->wg_parser, &offset, &size)) - continue; + break; if (offset >= file_size) size = 0; @@ -1234,9 +1233,8 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface) source->state = SOURCE_SHUTDOWN; - unix_funcs->wg_parser_disconnect(source->wg_parser); + unix_funcs->wg_parser_destroy(source->wg_parser); - source->read_thread_shutdown = true; WaitForSingleObject(source->read_thread, INFINITE); CloseHandle(source->read_thread); @@ -1257,8 +1255,6 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface) IMFMediaStream_Release(&stream->IMFMediaStream_iface); } - unix_funcs->wg_parser_destroy(source->wg_parser); - free(source->streams); MFUnlockWorkQueue(source->async_commands_queue); @@ -1426,16 +1422,13 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ free(stream); } free(object->streams); - if (stream_count != UINT_MAX) - unix_funcs->wg_parser_disconnect(object->wg_parser); + if (object->wg_parser) + unix_funcs->wg_parser_destroy(object->wg_parser); if (object->read_thread) { - object->read_thread_shutdown = true; WaitForSingleObject(object->read_thread, INFINITE); CloseHandle(object->read_thread); } - if (object->wg_parser) - unix_funcs->wg_parser_destroy(object->wg_parser); if (object->async_commands_queue) MFUnlockWorkQueue(object->async_commands_queue); if (object->event_queue) diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c index a8e7e3d979f..bf69a881d57 100644 --- a/dlls/winegstreamer/quartz_parser.c +++ b/dlls/winegstreamer/quartz_parser.c @@ -59,6 +59,7 @@ struct parser HANDLE read_thread; + struct wg_parser * (*parser_create)(void); BOOL (*init_gst)(struct parser *filter); HRESULT (*source_query_accept)(struct parser_source *pin, const AM_MEDIA_TYPE *mt); HRESULT (*source_get_media_type)(struct parser_source *pin, unsigned int index, AM_MEDIA_TYPE *mt); @@ -793,14 +794,14 @@ static DWORD CALLBACK read_thread(void *arg) TRACE("Starting read thread for filter %p.\n", filter); - while (filter->sink_connected) + for(;;) { uint64_t offset; uint32_t size; HRESULT hr; if (!unix_funcs->wg_parser_get_next_read_offset(filter->wg_parser, &offset, &size)) - continue; + break; if (offset >= file_size) size = 0; @@ -869,8 +870,6 @@ static void parser_destroy(struct strmbase_filter *iface) IAsyncReader_Release(filter->reader); filter->reader = NULL; - unix_funcs->wg_parser_destroy(filter->wg_parser); - strmbase_sink_cleanup(&filter->sink); strmbase_filter_cleanup(&filter->filter); free(filter); @@ -975,6 +974,12 @@ static HRESULT parser_sink_connect(struct strmbase_sink *iface, IPin *peer, cons IAsyncReader_Length(filter->reader, &file_size, &unused); + if (!(filter->wg_parser = filter->parser_create())) + { + hr = E_OUTOFMEMORY; + goto err; + } + filter->sink_connected = true; filter->read_thread = CreateThread(NULL, 0, read_thread, filter, 0, NULL); @@ -1115,11 +1120,7 @@ HRESULT decodebin_parser_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY; - if (!(object->wg_parser = unix_funcs->wg_decodebin_parser_create())) - { - free(object); - return E_OUTOFMEMORY; - } + object->parser_create = unix_funcs->wg_decodebin_parser_create; strmbase_filter_init(&object->filter, outer, &CLSID_decodebin_parser, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &sink_ops, NULL); @@ -1550,7 +1551,7 @@ static HRESULT GST_RemoveOutputPins(struct parser *This) if (!This->sink_connected) return S_OK; - unix_funcs->wg_parser_disconnect(This->wg_parser); + unix_funcs->wg_parser_destroy(This->wg_parser); /* read_thread() needs to stay alive to service any read requests GStreamer * sends, so we can only shut it down after GStreamer stops. */ @@ -1646,11 +1647,7 @@ HRESULT wave_parser_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY; - if (!(object->wg_parser = unix_funcs->wg_wave_parser_create())) - { - free(object); - return E_OUTOFMEMORY; - } + object->parser_create = unix_funcs->wg_wave_parser_create; strmbase_filter_init(&object->filter, outer, &CLSID_WAVEParser, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &wave_parser_sink_ops, NULL); @@ -1732,11 +1729,7 @@ HRESULT avi_splitter_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY; - if (!(object->wg_parser = unix_funcs->wg_avi_parser_create())) - { - free(object); - return E_OUTOFMEMORY; - } + object->parser_create = unix_funcs->wg_avi_parser_create; strmbase_filter_init(&object->filter, outer, &CLSID_AviSplitter, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &avi_splitter_sink_ops, NULL); @@ -1839,11 +1832,7 @@ HRESULT mpeg_splitter_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY; - if (!(object->wg_parser = unix_funcs->wg_mpeg_audio_parser_create())) - { - free(object); - return E_OUTOFMEMORY; - } + object->parser_create = unix_funcs->wg_mpeg_audio_parser_create; strmbase_filter_init(&object->filter, outer, &CLSID_MPEG1Splitter, &mpeg_splitter_ops); strmbase_sink_init(&object->sink, &object->filter, L"Input", &mpeg_splitter_sink_ops, NULL); diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 6b6b033b879..5ab7991b0f2 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -55,8 +55,8 @@ struct wg_parser pthread_mutex_t mutex; - pthread_cond_t init_cond; - bool no_more_pads, has_duration, error; + pthread_cond_t state_cond; + bool no_more_pads, has_duration, error, shutdown; pthread_cond_t read_cond; struct @@ -515,12 +515,13 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, { pthread_mutex_lock(&parser->mutex); - while (parser->sink_connected && !parser->read_request.pending) + while (!parser->shutdown && !parser->read_request.pending) pthread_cond_wait(&parser->read_cond, &parser->mutex); - if (!parser->sink_connected) + if (parser->shutdown) { pthread_mutex_unlock(&parser->mutex); + pthread_cond_signal(&parser->state_cond); return false; } @@ -543,13 +544,10 @@ static void CDECL wg_parser_push_data(struct wg_parser *parser, { pthread_mutex_lock(&parser->mutex); - if (parser->sink_connected) - { - error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->read_request.offset); - message = gst_message_new_error(NULL, error, ""); - gst_bus_post(parser->bus, message); - parser->read_request.pending = false; - } + error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->read_request.offset); + message = gst_message_new_error(NULL, error, ""); + gst_bus_post(parser->bus, message); + parser->read_request.pending = false; pthread_mutex_unlock(&parser->mutex); return; @@ -559,8 +557,7 @@ static void CDECL wg_parser_push_data(struct wg_parser *parser, { pthread_mutex_lock(&parser->mutex); - if (parser->sink_connected) - g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret); + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret); parser->read_request.pending = false; pthread_mutex_unlock(&parser->mutex); @@ -577,14 +574,6 @@ static void CDECL wg_parser_push_data(struct wg_parser *parser, gst_buffer_fill(buffer, 0, data, size); pthread_mutex_lock(&parser->mutex); - - if (!parser->sink_connected) - { - pthread_mutex_unlock(&parser->mutex); - gst_buffer_unref(buffer); - return; - } - assert(parser->read_request.pending); GST_BUFFER_OFFSET(buffer) = parser->read_request.offset; @@ -798,7 +787,7 @@ static void no_more_pads_cb(GstElement *element, gpointer user) pthread_mutex_lock(&parser->mutex); parser->no_more_pads = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); } static GstFlowReturn queue_stream_event(struct wg_parser_stream *stream, @@ -885,7 +874,7 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) pthread_mutex_lock(&parser->mutex); stream->eos = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); } break; @@ -936,7 +925,7 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) wg_format_from_caps(&stream->preferred_format, caps); stream->has_caps = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); break; } @@ -1322,7 +1311,7 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use pthread_mutex_lock(&parser->mutex); parser->error = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); break; case GST_MESSAGE_WARNING: @@ -1337,7 +1326,7 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use pthread_mutex_lock(&parser->mutex); parser->has_duration = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); break; default: @@ -1352,8 +1341,6 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s unsigned int i; int ret; - parser->sink_connected = true; - if (!parser->bus) { parser->bus = gst_bus_new(); @@ -1402,7 +1389,7 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s gint64 duration; while (!stream->has_caps && !parser->error) - pthread_cond_wait(&parser->init_cond, &parser->mutex); + pthread_cond_wait(&parser->state_cond, &parser->mutex); /* GStreamer doesn't actually provide any guarantees about when duration * is available, even for seekable streams. It's basically built for @@ -1461,13 +1448,14 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s } else { - pthread_cond_wait(&parser->init_cond, &parser->mutex); + pthread_cond_wait(&parser->state_cond, &parser->mutex); } } } pthread_mutex_unlock(&parser->mutex); + parser->sink_connected = true; return S_OK; out: @@ -1495,38 +1483,6 @@ out: return E_FAIL; } -static void CDECL wg_parser_disconnect(struct wg_parser *parser) -{ - unsigned int i; - - /* Unblock all of our streams. */ - pthread_mutex_lock(&parser->mutex); - for (i = 0; i < parser->stream_count; ++i) - { - parser->streams[i]->flushing = true; - pthread_cond_signal(&parser->streams[i]->event_empty_cond); - } - pthread_mutex_unlock(&parser->mutex); - - gst_element_set_state(parser->container, GST_STATE_NULL); - - pthread_mutex_lock(&parser->mutex); - parser->sink_connected = false; - pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->read_cond); - - for (i = 0; i < parser->stream_count; ++i) - free_stream(parser->streams[i]); - - parser->stream_count = 0; - free(parser->streams); - parser->streams = NULL; - - gst_element_set_bus(parser->container, NULL); - gst_object_unref(parser->container); - parser->container = NULL; -} - static BOOL decodebin_parser_init_gst(struct wg_parser *parser) { GstElement *element; @@ -1656,7 +1612,7 @@ static struct wg_parser *wg_parser_create(void) return NULL; pthread_mutex_init(&parser->mutex, NULL); - pthread_cond_init(&parser->init_cond, NULL); + pthread_cond_init(&parser->state_cond, NULL); pthread_cond_init(&parser->read_cond, NULL); parser->flushing = true; @@ -1702,14 +1658,42 @@ static struct wg_parser * CDECL wg_wave_parser_create(void) static void CDECL wg_parser_destroy(struct wg_parser *parser) { - if (parser->bus) + unsigned int i; + + pthread_mutex_lock(&parser->mutex); + parser->shutdown = true; + pthread_cond_signal(&parser->read_cond); + pthread_cond_wait(&parser->state_cond, &parser->mutex); + pthread_mutex_unlock(&parser->mutex); + + if (parser->sink_connected) { - gst_bus_set_sync_handler(parser->bus, NULL, NULL, NULL); - gst_object_unref(parser->bus); + /* Unblock all of our streams. */ + pthread_mutex_lock(&parser->mutex); + for (i = 0; i < parser->stream_count; ++i) + { + parser->streams[i]->flushing = true; + pthread_cond_signal(&parser->streams[i]->event_empty_cond); + } + pthread_mutex_unlock(&parser->mutex); + + gst_element_set_state(parser->container, GST_STATE_NULL); + + for (i = 0; i < parser->stream_count; ++i) + free_stream(parser->streams[i]); + + parser->stream_count = 0; + free(parser->streams); + + gst_element_set_bus(parser->container, NULL); + gst_object_unref(parser->container); } + gst_bus_set_sync_handler(parser->bus, NULL, NULL, NULL); + gst_object_unref(parser->bus); + pthread_mutex_destroy(&parser->mutex); - pthread_cond_destroy(&parser->init_cond); + pthread_cond_destroy(&parser->state_cond); pthread_cond_destroy(&parser->read_cond); free(parser); @@ -1724,7 +1708,6 @@ static const struct unix_funcs funcs = wg_parser_destroy, wg_parser_connect, - wg_parser_disconnect, wg_parser_begin_flush, wg_parser_end_flush, -- 2.33.0