From: Derek Lesho Subject: [PATCH v2 3/6] winegstreamer: Defer blocking on GStreamer initialization until getting a stream. Message-Id: <20210917195858.42475-3-dlesho@codeweavers.com> Date: Fri, 17 Sep 2021 15:58:55 -0400 In-Reply-To: <20210917195858.42475-1-dlesho@codeweavers.com> References: <20210917195858.42475-1-dlesho@codeweavers.com> This commit has several purposes. - It will allow wg_parser_create and wg_parser_connect to be merged, as the functionality of both functions now can be run before the read thread starts. - It also allows us to simplify a future push-mode client where there is no separate read thread, and we want to send buffers and check for a response from the same thread which initialized the object. Signed-off-by: Derek Lesho --- dlls/winegstreamer/media_source.c | 6 +- dlls/winegstreamer/quartz_parser.c | 20 +-- dlls/winegstreamer/wg_parser.c | 196 +++++++++++++++-------------- 3 files changed, 123 insertions(+), 99 deletions(-) diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 6c2bf92e2a2..21e5f6c5d5b 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -1346,7 +1346,11 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ * leak occurs with native. */ unix_funcs->wg_parser_set_unlimited_buffering(parser); - stream_count = unix_funcs->wg_parser_get_stream_count(parser); + if (!(stream_count = unix_funcs->wg_parser_get_stream_count(parser))) + { + hr = E_FAIL; + goto fail; + } if (!(object->streams = calloc(stream_count, sizeof(*object->streams)))) { diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c index bf69a881d57..20000f63eb7 100644 --- a/dlls/winegstreamer/quartz_parser.c +++ b/dlls/winegstreamer/quartz_parser.c @@ -1039,7 +1039,7 @@ static BOOL decodebin_parser_filter_init_gst(struct parser *filter) return FALSE; } - return TRUE; + return !!stream_count; } static HRESULT decodebin_parser_source_query_accept(struct parser_source *pin, const AM_MEDIA_TYPE *mt) @@ -1603,11 +1603,12 @@ static const struct strmbase_sink_ops wave_parser_sink_ops = static BOOL wave_parser_filter_init_gst(struct parser *filter) { struct wg_parser *parser = filter->wg_parser; + struct wg_parser_stream *stream = unix_funcs->wg_parser_get_stream(parser, 0); - if (!create_pin(filter, unix_funcs->wg_parser_get_stream(parser, 0), L"output")) + if (!stream) return FALSE; - return TRUE; + return !!create_pin(filter, stream, L"output"); } static HRESULT wave_parser_source_query_accept(struct parser_source *pin, const AM_MEDIA_TYPE *mt) @@ -1678,18 +1679,21 @@ static const struct strmbase_sink_ops avi_splitter_sink_ops = static BOOL avi_splitter_filter_init_gst(struct parser *filter) { struct wg_parser *parser = filter->wg_parser; + struct wg_parser_stream *stream; uint32_t i, stream_count; WCHAR source_name[20]; stream_count = unix_funcs->wg_parser_get_stream_count(parser); for (i = 0; i < stream_count; ++i) { + if (!(stream = unix_funcs->wg_parser_get_stream(parser, i))) + return FALSE; swprintf(source_name, ARRAY_SIZE(source_name), L"Stream %02u", i); - if (!create_pin(filter, unix_funcs->wg_parser_get_stream(parser, i), source_name)) + if (!create_pin(filter, stream, source_name)) return FALSE; } - return TRUE; + return !!stream_count; } static HRESULT avi_splitter_source_query_accept(struct parser_source *pin, const AM_MEDIA_TYPE *mt) @@ -1765,11 +1769,13 @@ static const struct strmbase_sink_ops mpeg_splitter_sink_ops = static BOOL mpeg_splitter_filter_init_gst(struct parser *filter) { struct wg_parser *parser = filter->wg_parser; + struct wg_parser_stream *stream = unix_funcs->wg_parser_get_stream(parser, 0); - if (!create_pin(filter, unix_funcs->wg_parser_get_stream(parser, 0), L"Audio")) + if (!stream) return FALSE; - return TRUE; + return !!create_pin(filter, stream, L"Audio"); + return FALSE; } static HRESULT mpeg_splitter_source_query_accept(struct parser_source *pin, const AM_MEDIA_TYPE *mt) diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 08823d52101..98b7491998b 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -478,13 +478,113 @@ static bool wg_format_compare(const struct wg_format *a, const struct wg_format return false; } +static bool wg_parser_initialize(struct wg_parser *parser) +{ + unsigned int i; + int ret; + + ret = gst_element_get_state(parser->container, NULL, NULL, -1); + if (ret == GST_STATE_CHANGE_FAILURE) + { + GST_ERROR("Failed to play stream.\n"); + return 0; + } + + pthread_mutex_lock(&parser->mutex); + + while (!parser->no_more_pads && !parser->error) + pthread_cond_wait(&parser->state_cond, &parser->mutex); + if (parser->error) + { + pthread_mutex_unlock(&parser->mutex); + return 0; + } + + for (i = 0; i < parser->stream_count; ++i) + { + struct wg_parser_stream *stream = parser->streams[i]; + gint64 duration; + + while (!stream->has_caps && !parser->error) + pthread_cond_wait(&parser->state_cond, &parser->mutex); + + /* GStreamer doesn't actually provide any guarantees about when duration + * is available, even for seekable streams. It's basically built for + * applications that don't care, e.g. movie players that can display + * a duration once it's available, and update it visually if a better + * estimate is found. This doesn't really match well with DirectShow or + * Media Foundation, which both expect duration to be available + * immediately on connecting, so we have to use some complex heuristics + * to try to actually get a usable duration. + * + * Some elements (avidemux, wavparse, qtdemux) record duration almost + * immediately, before fixing caps. Such elements don't send + * duration-changed messages. Therefore always try querying duration + * after caps have been found. + * + * Some elements (mpegaudioparse) send duration-changed. In the case of + * a mp3 stream without seek tables it will not be sent immediately, but + * only after enough frames have been parsed to form an estimate. They + * may send it multiple times with increasingly accurate estimates, but + * unfortunately we have no way of knowing whether another estimate will + * be sent, so we always take the first one. We assume that if the + * duration is not immediately available then the element will always + * send duration-changed. + */ + + for (;;) + { + if (parser->error) + { + pthread_mutex_unlock(&parser->mutex); + return 0; + } + if (gst_pad_query_duration(stream->their_src, GST_FORMAT_TIME, &duration)) + { + stream->duration = duration / 100; + break; + } + + if (stream->eos) + { + stream->duration = 0; + GST_WARNING("Failed to query duration.\n"); + break; + } + + /* Elements based on GstBaseParse send duration-changed before + * actually updating the duration in GStreamer versions prior + * to 1.17.1. See . So after + * receiving duration-changed we have to continue polling until + * the query succeeds. */ + if (parser->has_duration) + { + pthread_mutex_unlock(&parser->mutex); + g_usleep(10000); + pthread_mutex_lock(&parser->mutex); + } + else + { + pthread_cond_wait(&parser->state_cond, &parser->mutex); + } + } + } + + pthread_mutex_unlock(&parser->mutex); + return 1; +} + static uint32_t CDECL wg_parser_get_stream_count(struct wg_parser *parser) { + if (!wg_parser_initialize(parser)) + return 0; return parser->stream_count; } static struct wg_parser_stream * CDECL wg_parser_get_stream(struct wg_parser *parser, uint32_t index) { + if (!wg_parser_initialize(parser)) + return NULL; return parser->streams[index]; } @@ -492,6 +592,9 @@ static void CDECL wg_parser_begin_flush(struct wg_parser *parser) { unsigned int i; + if (!wg_parser_initialize(parser)) + return; + pthread_mutex_lock(&parser->mutex); parser->flushing = true; pthread_mutex_unlock(&parser->mutex); @@ -1342,9 +1445,6 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_size) { - unsigned int i; - int ret; - if (!parser->bus) { parser->bus = gst_bus_new(); @@ -1370,94 +1470,6 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s return E_FAIL; gst_element_set_state(parser->container, GST_STATE_PAUSED); - ret = gst_element_get_state(parser->container, NULL, NULL, -1); - if (ret == GST_STATE_CHANGE_FAILURE) - { - GST_ERROR("Failed to play stream.\n"); - return E_FAIL; - } - - pthread_mutex_lock(&parser->mutex); - - while (!parser->no_more_pads && !parser->error) - pthread_cond_wait(&parser->state_cond, &parser->mutex); - if (parser->error) - { - pthread_mutex_unlock(&parser->mutex); - return E_FAIL; - } - - for (i = 0; i < parser->stream_count; ++i) - { - struct wg_parser_stream *stream = parser->streams[i]; - gint64 duration; - - while (!stream->has_caps && !parser->error) - pthread_cond_wait(&parser->state_cond, &parser->mutex); - - /* GStreamer doesn't actually provide any guarantees about when duration - * is available, even for seekable streams. It's basically built for - * applications that don't care, e.g. movie players that can display - * a duration once it's available, and update it visually if a better - * estimate is found. This doesn't really match well with DirectShow or - * Media Foundation, which both expect duration to be available - * immediately on connecting, so we have to use some complex heuristics - * to try to actually get a usable duration. - * - * Some elements (avidemux, wavparse, qtdemux) record duration almost - * immediately, before fixing caps. Such elements don't send - * duration-changed messages. Therefore always try querying duration - * after caps have been found. - * - * Some elements (mpegaudioparse) send duration-changed. In the case of - * a mp3 stream without seek tables it will not be sent immediately, but - * only after enough frames have been parsed to form an estimate. They - * may send it multiple times with increasingly accurate estimates, but - * unfortunately we have no way of knowing whether another estimate will - * be sent, so we always take the first one. We assume that if the - * duration is not immediately available then the element will always - * send duration-changed. - */ - - for (;;) - { - if (parser->error) - { - pthread_mutex_unlock(&parser->mutex); - return E_FAIL; - } - if (gst_pad_query_duration(stream->their_src, GST_FORMAT_TIME, &duration)) - { - stream->duration = duration / 100; - break; - } - - if (stream->eos) - { - stream->duration = 0; - GST_WARNING("Failed to query duration.\n"); - break; - } - - /* Elements based on GstBaseParse send duration-changed before - * actually updating the duration in GStreamer versions prior - * to 1.17.1. See . So after - * receiving duration-changed we have to continue polling until - * the query succeeds. */ - if (parser->has_duration) - { - pthread_mutex_unlock(&parser->mutex); - g_usleep(10000); - pthread_mutex_lock(&parser->mutex); - } - else - { - pthread_cond_wait(&parser->state_cond, &parser->mutex); - } - } - } - - pthread_mutex_unlock(&parser->mutex); return S_OK; } @@ -1639,6 +1651,8 @@ static void CDECL wg_parser_destroy(struct wg_parser *parser) { unsigned int i; + wg_parser_initialize(parser); + pthread_mutex_lock(&parser->mutex); parser->close_reader = true; pthread_cond_signal(&parser->read_cond); -- 2.33.0