From: Derek Lesho Subject: [RFC PATCH 4/5] winegstreamer: Merge wg_parser_disconnect and wg_parser_destroy. Message-Id: <20210901210558.892101-4-dlesho@codeweavers.com> Date: Wed, 1 Sep 2021 17:05:57 -0400 In-Reply-To: <20210901210558.892101-1-dlesho@codeweavers.com> References: <20210901210558.892101-1-dlesho@codeweavers.com> Signed-off-by: Derek Lesho --- dlls/winegstreamer/gst_private.h | 1 - dlls/winegstreamer/media_source.c | 13 ++-- dlls/winegstreamer/quartz_parser.c | 39 ++++------- dlls/winegstreamer/wg_parser.c | 100 +++++++++++++++-------------- 4 files changed, 69 insertions(+), 84 deletions(-) diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index 9943682facb..36d88f7b723 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -164,7 +164,6 @@ struct unix_funcs void (CDECL *wg_parser_destroy)(struct wg_parser *parser); HRESULT (CDECL *wg_parser_connect)(struct wg_parser *parser, uint64_t file_size); - void (CDECL *wg_parser_disconnect)(struct wg_parser *parser); void (CDECL *wg_parser_begin_flush)(struct wg_parser *parser); void (CDECL *wg_parser_end_flush)(struct wg_parser *parser); diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index abd9a220a7f..75b3b399f4b 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -103,7 +103,6 @@ struct media_source LONGLONG start_time; HANDLE read_thread; - bool read_thread_shutdown; }; static inline struct media_stream *impl_from_IMFMediaStream(IMFMediaStream *iface) @@ -533,7 +532,7 @@ static DWORD CALLBACK read_thread(void *arg) TRACE("Starting read thread for media source %p.\n", source); - while (!source->read_thread_shutdown) + for(;;) { uint64_t offset; ULONG ret_size; @@ -542,7 +541,7 @@ static DWORD CALLBACK read_thread(void *arg) void *data; if (!unix_funcs->wg_parser_get_next_read_offset(source->wg_parser, &offset, &size)) - continue; + break; data = malloc(size); ret_size = 0; @@ -1218,12 +1217,10 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface) source->state = SOURCE_SHUTDOWN; - if (source->stream_count) - unix_funcs->wg_parser_disconnect(source->wg_parser); + unix_funcs->wg_parser_destroy(source->wg_parser); if (source->read_thread) { - source->read_thread_shutdown = true; WaitForSingleObject(source->read_thread, INFINITE); CloseHandle(source->read_thread); } @@ -1254,8 +1251,6 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface) IMFMediaStream_Release(&stream->IMFMediaStream_iface); } - unix_funcs->wg_parser_destroy(source->wg_parser); - if (source->streams) free(source->streams); @@ -1402,7 +1397,7 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ fail: WARN("Failed to construct MFMediaSource, hr %#x.\n", hr); - if (object->wg_parser) + if (parser) IMFMediaSource_Shutdown(&object->IMFMediaSource_iface); free(descriptors); IMFMediaSource_Release(&object->IMFMediaSource_iface); diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c index c85cbe4cf44..3e35e7d1503 100644 --- a/dlls/winegstreamer/quartz_parser.c +++ b/dlls/winegstreamer/quartz_parser.c @@ -59,6 +59,7 @@ struct parser HANDLE read_thread; + struct wg_parser * (*parser_create)(void); BOOL (*init_gst)(struct parser *filter); HRESULT (*source_query_accept)(struct parser_source *pin, const AM_MEDIA_TYPE *mt); HRESULT (*source_get_media_type)(struct parser_source *pin, unsigned int index, AM_MEDIA_TYPE *mt); @@ -788,7 +789,7 @@ static DWORD CALLBACK read_thread(void *arg) TRACE("Starting read thread for filter %p.\n", filter); - while (filter->sink_connected) + for(;;) { uint64_t offset; uint32_t size; @@ -796,7 +797,7 @@ static DWORD CALLBACK read_thread(void *arg) void *data; if (!unix_funcs->wg_parser_get_next_read_offset(filter->wg_parser, &offset, &size)) - continue; + break; data = malloc(size); hr = IAsyncReader_SyncRead(filter->reader, offset, size, data); if (FAILED(hr)) @@ -853,8 +854,6 @@ static void parser_destroy(struct strmbase_filter *iface) IAsyncReader_Release(filter->reader); filter->reader = NULL; - unix_funcs->wg_parser_destroy(filter->wg_parser); - strmbase_sink_cleanup(&filter->sink); strmbase_filter_cleanup(&filter->filter); free(filter); @@ -959,6 +958,12 @@ static HRESULT parser_sink_connect(struct strmbase_sink *iface, IPin *peer, cons IAsyncReader_Length(filter->reader, &file_size, &unused); + if (!(filter->wg_parser = filter->parser_create())) + { + hr = E_OUTOFMEMORY; + goto err; + } + filter->sink_connected = true; filter->read_thread = CreateThread(NULL, 0, read_thread, filter, 0, NULL); @@ -1096,11 +1101,7 @@ HRESULT decodebin_parser_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY; - if (!(object->wg_parser = unix_funcs->wg_decodebin_parser_create())) - { - free(object); - return E_OUTOFMEMORY; - } + object->parser_create = unix_funcs->wg_decodebin_parser_create; strmbase_filter_init(&object->filter, outer, &CLSID_decodebin_parser, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &sink_ops, NULL); @@ -1531,7 +1532,7 @@ static HRESULT GST_RemoveOutputPins(struct parser *This) if (!This->sink_connected) return S_OK; - unix_funcs->wg_parser_disconnect(This->wg_parser); + unix_funcs->wg_parser_destroy(This->wg_parser); /* read_thread() needs to stay alive to service any read requests GStreamer * sends, so we can only shut it down after GStreamer stops. */ @@ -1627,11 +1628,7 @@ HRESULT wave_parser_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY; - if (!(object->wg_parser = unix_funcs->wg_wave_parser_create())) - { - free(object); - return E_OUTOFMEMORY; - } + object->parser_create = unix_funcs->wg_wave_parser_create; strmbase_filter_init(&object->filter, outer, &CLSID_WAVEParser, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &wave_parser_sink_ops, NULL); @@ -1713,11 +1710,7 @@ HRESULT avi_splitter_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY; - if (!(object->wg_parser = unix_funcs->wg_avi_parser_create())) - { - free(object); - return E_OUTOFMEMORY; - } + object->parser_create = unix_funcs->wg_avi_parser_create; strmbase_filter_init(&object->filter, outer, &CLSID_AviSplitter, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &avi_splitter_sink_ops, NULL); @@ -1820,11 +1813,7 @@ HRESULT mpeg_splitter_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY; - if (!(object->wg_parser = unix_funcs->wg_mpeg_audio_parser_create())) - { - free(object); - return E_OUTOFMEMORY; - } + object->parser_create = unix_funcs->wg_mpeg_audio_parser_create; strmbase_filter_init(&object->filter, outer, &CLSID_MPEG1Splitter, &mpeg_splitter_ops); strmbase_sink_init(&object->sink, &object->filter, L"Input", &mpeg_splitter_sink_ops, NULL); diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 7f2627235f1..d29298632a4 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -58,8 +58,8 @@ struct wg_parser pthread_mutex_t mutex; - pthread_cond_t init_cond; - bool no_more_pads, has_duration, error; + pthread_cond_t state_cond; + bool no_more_pads, has_duration, error, shutdown; pthread_cond_t read_cond; struct @@ -518,12 +518,13 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, { pthread_mutex_lock(&parser->mutex); - while (parser->sink_connected && parser->read_request.offset == -1) + while (!parser->shutdown && parser->read_request.offset == -1) pthread_cond_wait(&parser->read_cond, &parser->mutex); - if (!parser->sink_connected) + if (parser->shutdown) { pthread_mutex_unlock(&parser->mutex); + pthread_cond_signal(&parser->state_cond); return false; } @@ -779,7 +780,7 @@ static void no_more_pads_cb(GstElement *element, gpointer user) pthread_mutex_lock(&parser->mutex); parser->no_more_pads = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); } static GstFlowReturn queue_stream_event(struct wg_parser_stream *stream, @@ -866,7 +867,7 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) pthread_mutex_lock(&parser->mutex); stream->eos = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); } break; @@ -917,7 +918,7 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) wg_format_from_caps(&stream->preferred_format, caps); stream->has_caps = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); break; } @@ -1264,7 +1265,7 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use pthread_mutex_lock(&parser->mutex); parser->error = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); break; case GST_MESSAGE_WARNING: @@ -1279,7 +1280,7 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use pthread_mutex_lock(&parser->mutex); parser->has_duration = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); break; default: @@ -1294,7 +1295,6 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s unsigned int i; parser->file_size = file_size; - parser->sink_connected = true; if (!parser->bus) { @@ -1327,7 +1327,7 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s gint64 duration; while (!stream->has_caps && !parser->error) - pthread_cond_wait(&parser->init_cond, &parser->mutex); + pthread_cond_wait(&parser->state_cond, &parser->mutex); /* GStreamer doesn't actually provide any guarantees about when duration * is available, even for seekable streams. It's basically built for @@ -1386,13 +1386,14 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s } else { - pthread_cond_wait(&parser->init_cond, &parser->mutex); + pthread_cond_wait(&parser->state_cond, &parser->mutex); } } } pthread_mutex_unlock(&parser->mutex); + parser->sink_connected = true; return S_OK; } @@ -1420,38 +1421,6 @@ static void free_stream(struct wg_parser_stream *stream) free(stream); } -static void CDECL wg_parser_disconnect(struct wg_parser *parser) -{ - unsigned int i; - - /* Unblock all of our streams. */ - pthread_mutex_lock(&parser->mutex); - for (i = 0; i < parser->stream_count; ++i) - { - parser->streams[i]->flushing = true; - pthread_cond_signal(&parser->streams[i]->event_empty_cond); - } - pthread_mutex_unlock(&parser->mutex); - - gst_element_set_state(parser->container, GST_STATE_NULL); - - pthread_mutex_lock(&parser->mutex); - parser->sink_connected = false; - pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->read_cond); - - for (i = 0; i < parser->stream_count; ++i) - free_stream(parser->streams[i]); - - parser->stream_count = 0; - free(parser->streams); - parser->streams = NULL; - - gst_element_set_bus(parser->container, NULL); - gst_object_unref(parser->container); - parser->container = NULL; -} - static BOOL decodebin_parser_init_gst(struct wg_parser *parser) { GstElement *element; @@ -1488,7 +1457,7 @@ static BOOL decodebin_parser_init_gst(struct wg_parser *parser) pthread_mutex_lock(&parser->mutex); while (!parser->no_more_pads && !parser->error) - pthread_cond_wait(&parser->init_cond, &parser->mutex); + pthread_cond_wait(&parser->state_cond, &parser->mutex); if (parser->error) { pthread_mutex_unlock(&parser->mutex); @@ -1533,7 +1502,7 @@ static BOOL avi_parser_init_gst(struct wg_parser *parser) pthread_mutex_lock(&parser->mutex); while (!parser->no_more_pads && !parser->error) - pthread_cond_wait(&parser->init_cond, &parser->mutex); + pthread_cond_wait(&parser->state_cond, &parser->mutex); if (parser->error) { pthread_mutex_unlock(&parser->mutex); @@ -1631,7 +1600,7 @@ static struct wg_parser *wg_parser_create(void) return NULL; pthread_mutex_init(&parser->mutex, NULL); - pthread_cond_init(&parser->init_cond, NULL); + pthread_cond_init(&parser->state_cond, NULL); pthread_cond_init(&parser->read_cond, NULL); parser->flushing = true; parser->read_request.offset = -1; @@ -1678,6 +1647,40 @@ static struct wg_parser * CDECL wg_wave_parser_create(void) static void CDECL wg_parser_destroy(struct wg_parser *parser) { + unsigned int i; + + /* shut down read thread first to post-shutdown push_data */ + pthread_mutex_lock(&parser->mutex); + parser->shutdown = true; + pthread_cond_signal(&parser->read_cond); + pthread_cond_wait(&parser->state_cond, &parser->mutex); + pthread_mutex_unlock(&parser->mutex); + + if (parser->sink_connected) + { + /* Unblock all of our streams. */ + pthread_mutex_lock(&parser->mutex); + for (i = 0; i < parser->stream_count; ++i) + { + parser->streams[i]->flushing = true; + pthread_cond_signal(&parser->streams[i]->event_empty_cond); + } + pthread_mutex_unlock(&parser->mutex); + + gst_element_set_state(parser->container, GST_STATE_NULL); + + for (i = 0; i < parser->stream_count; ++i) + free_stream(parser->streams[i]); + + parser->stream_count = 0; + free(parser->streams); + parser->streams = NULL; + + gst_element_set_bus(parser->container, NULL); + gst_object_unref(parser->container); + parser->container = NULL; + } + if (parser->bus) { gst_bus_set_sync_handler(parser->bus, NULL, NULL, NULL); @@ -1685,7 +1688,7 @@ static void CDECL wg_parser_destroy(struct wg_parser *parser) } pthread_mutex_destroy(&parser->mutex); - pthread_cond_destroy(&parser->init_cond); + pthread_cond_destroy(&parser->state_cond); pthread_cond_destroy(&parser->read_cond); free(parser); @@ -1700,7 +1703,6 @@ static const struct unix_funcs funcs = wg_parser_destroy, wg_parser_connect, - wg_parser_disconnect, wg_parser_begin_flush, wg_parser_end_flush, -- 2.33.0