From: Derek Lesho Subject: [RFC PATCH 5/5] winegstreamer: Merge parser creation functions and wg_parser_connect. Message-Id: <20210901210558.892101-5-dlesho@codeweavers.com> Date: Wed, 1 Sep 2021 17:05:58 -0400 In-Reply-To: <20210901210558.892101-1-dlesho@codeweavers.com> References: <20210901210558.892101-1-dlesho@codeweavers.com> --- dlls/winegstreamer/gst_private.h | 17 +- dlls/winegstreamer/media_source.c | 24 ++- dlls/winegstreamer/quartz_parser.c | 29 +-- dlls/winegstreamer/wg_parser.c | 308 +++++++++++++---------------- 4 files changed, 183 insertions(+), 195 deletions(-) diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index 36d88f7b723..f6c6eff1e0c 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -155,16 +155,21 @@ struct wg_parser_event }; C_ASSERT(sizeof(struct wg_parser_event) == 40); +enum wg_parser_type +{ + WG_DECODEBIN_PARSER, + WG_AVI_PARSER, + WG_MPEG_AUDIO_PARSER, + WG_WAVE_PARSER, +}; + +struct wg_parser; + struct unix_funcs { - struct wg_parser *(CDECL *wg_decodebin_parser_create)(void); - struct wg_parser *(CDECL *wg_avi_parser_create)(void); - struct wg_parser *(CDECL *wg_mpeg_audio_parser_create)(void); - struct wg_parser *(CDECL *wg_wave_parser_create)(void); + HRESULT (CDECL *wg_parser_create)(enum wg_parser_type parser_type, uint64_t file_size, struct wg_parser **parser); void (CDECL *wg_parser_destroy)(struct wg_parser *parser); - HRESULT (CDECL *wg_parser_connect)(struct wg_parser *parser, uint64_t file_size); - void (CDECL *wg_parser_begin_flush)(struct wg_parser *parser); void (CDECL *wg_parser_end_flush)(struct wg_parser *parser); diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 75b3b399f4b..928ee63b649 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -530,6 +530,12 @@ static DWORD CALLBACK read_thread(void *arg) struct media_source *source = arg; IMFByteStream *byte_stream = source->byte_stream; + while (!source->wg_parser && source->state != SOURCE_SHUTDOWN) + continue; + + if (source->state == SOURCE_SHUTDOWN) + return 0; + TRACE("Starting read thread for media source %p.\n", source); for(;;) @@ -1321,19 +1327,17 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ if (FAILED(hr = MFAllocateWorkQueue(&object->async_commands_queue))) goto fail; - if (!(parser = unix_funcs->wg_decodebin_parser_create())) - { - hr = E_OUTOFMEMORY; - goto fail; - } - object->wg_parser = parser; - - object->read_thread = CreateThread(NULL, 0, read_thread, object, 0, NULL); - object->state = SOURCE_OPENING; + object->read_thread = CreateThread(NULL, 0, read_thread, object, 0, NULL); - if (FAILED(hr = unix_funcs->wg_parser_connect(parser, file_size))) + if (FAILED(hr = unix_funcs->wg_parser_create(WG_DECODEBIN_PARSER, file_size, &object->wg_parser))) + { + object->state = SOURCE_SHUTDOWN; + WaitForSingleObject(object->read_thread, INFINITE); + CloseHandle(object->read_thread); goto fail; + } + parser = object->wg_parser; /* In Media Foundation, sources may read from any media source stream * without fear of blocking due to buffering limits on another. Trailmakers, diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c index 3e35e7d1503..a5293394202 100644 --- a/dlls/winegstreamer/quartz_parser.c +++ b/dlls/winegstreamer/quartz_parser.c @@ -59,7 +59,7 @@ struct parser HANDLE read_thread; - struct wg_parser * (*parser_create)(void); + enum wg_parser_type parser_type; BOOL (*init_gst)(struct parser *filter); HRESULT (*source_query_accept)(struct parser_source *pin, const AM_MEDIA_TYPE *mt); HRESULT (*source_get_media_type)(struct parser_source *pin, unsigned int index, AM_MEDIA_TYPE *mt); @@ -787,6 +787,12 @@ static DWORD CALLBACK read_thread(void *arg) { struct parser *filter = arg; + while (!filter->wg_parser && filter->sink_connected) + continue; + + if (!filter->sink_connected) + return 0; + TRACE("Starting read thread for filter %p.\n", filter); for(;;) @@ -958,17 +964,16 @@ static HRESULT parser_sink_connect(struct strmbase_sink *iface, IPin *peer, cons IAsyncReader_Length(filter->reader, &file_size, &unused); - if (!(filter->wg_parser = filter->parser_create())) - { - hr = E_OUTOFMEMORY; - goto err; - } - filter->sink_connected = true; filter->read_thread = CreateThread(NULL, 0, read_thread, filter, 0, NULL); - if (FAILED(hr = unix_funcs->wg_parser_connect(filter->wg_parser, file_size))) + if (FAILED(hr = unix_funcs->wg_parser_create(filter->parser_type, file_size, &filter->wg_parser))) + { + filter->sink_connected = false; + WaitForSingleObject(filter->read_thread, INFINITE); + CloseHandle(filter->read_thread); goto err; + } if (!filter->init_gst(filter)) goto err; @@ -1101,7 +1106,7 @@ HRESULT decodebin_parser_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY; - object->parser_create = unix_funcs->wg_decodebin_parser_create; + object->parser_type = WG_DECODEBIN_PARSER; strmbase_filter_init(&object->filter, outer, &CLSID_decodebin_parser, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &sink_ops, NULL); @@ -1628,7 +1633,7 @@ HRESULT wave_parser_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY; - object->parser_create = unix_funcs->wg_wave_parser_create; + object->parser_type = WG_WAVE_PARSER; strmbase_filter_init(&object->filter, outer, &CLSID_WAVEParser, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &wave_parser_sink_ops, NULL); @@ -1710,7 +1715,7 @@ HRESULT avi_splitter_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY; - object->parser_create = unix_funcs->wg_avi_parser_create; + object->parser_type = WG_AVI_PARSER; strmbase_filter_init(&object->filter, outer, &CLSID_AviSplitter, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &avi_splitter_sink_ops, NULL); @@ -1813,7 +1818,7 @@ HRESULT mpeg_splitter_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY; - object->parser_create = unix_funcs->wg_mpeg_audio_parser_create; + object->parser_type = WG_MPEG_AUDIO_PARSER; strmbase_filter_init(&object->filter, outer, &CLSID_MPEG1Splitter, &mpeg_splitter_ops); strmbase_sink_init(&object->sink, &object->filter, L"Input", &mpeg_splitter_sink_ops, NULL); diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index d29298632a4..7726e6aca30 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -45,8 +45,6 @@ GST_DEBUG_CATEGORY_STATIC(wine); struct wg_parser { - BOOL (*init_gst)(struct wg_parser *parser); - struct wg_parser_stream **streams; unsigned int stream_count; @@ -69,7 +67,7 @@ struct wg_parser uint32_t size; } read_request; - bool flushing, sink_connected; + bool flushing; }; struct wg_parser_stream @@ -1290,113 +1288,6 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use return GST_BUS_DROP; } -static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_size) -{ - unsigned int i; - - parser->file_size = file_size; - - if (!parser->bus) - { - parser->bus = gst_bus_new(); - gst_bus_set_sync_handler(parser->bus, bus_handler_cb, parser, NULL); - } - - parser->container = gst_bin_new(NULL); - gst_element_set_bus(parser->container, parser->bus); - - if (!(parser->appsrc = create_element("appsrc", "base"))) - return E_FAIL; - gst_bin_add(GST_BIN(parser->container), parser->appsrc); - - g_object_set(parser->appsrc, "stream-type", GST_APP_STREAM_TYPE_RANDOM_ACCESS, NULL); - g_object_set(parser->appsrc, "size", parser->file_size, NULL); - g_signal_connect(parser->appsrc, "need-data", G_CALLBACK(src_need_data), parser); - g_signal_connect(parser->appsrc, "seek-data", G_CALLBACK(src_seek_data), parser); - - parser->next_pull_offset = 0; - - if (!parser->init_gst(parser)) - return E_FAIL; - - pthread_mutex_lock(&parser->mutex); - - for (i = 0; i < parser->stream_count; ++i) - { - struct wg_parser_stream *stream = parser->streams[i]; - gint64 duration; - - while (!stream->has_caps && !parser->error) - pthread_cond_wait(&parser->state_cond, &parser->mutex); - - /* GStreamer doesn't actually provide any guarantees about when duration - * is available, even for seekable streams. It's basically built for - * applications that don't care, e.g. movie players that can display - * a duration once it's available, and update it visually if a better - * estimate is found. This doesn't really match well with DirectShow or - * Media Foundation, which both expect duration to be available - * immediately on connecting, so we have to use some complex heuristics - * to try to actually get a usable duration. - * - * Some elements (avidemux, wavparse, qtdemux) record duration almost - * immediately, before fixing caps. Such elements don't send - * duration-changed messages. Therefore always try querying duration - * after caps have been found. - * - * Some elements (mpegaudioparse) send duration-changed. In the case of - * a mp3 stream without seek tables it will not be sent immediately, but - * only after enough frames have been parsed to form an estimate. They - * may send it multiple times with increasingly accurate estimates, but - * unfortunately we have no way of knowing whether another estimate will - * be sent, so we always take the first one. We assume that if the - * duration is not immediately available then the element will always - * send duration-changed. - */ - - for (;;) - { - if (parser->error) - { - pthread_mutex_unlock(&parser->mutex); - return E_FAIL; - } - if (gst_pad_query_duration(stream->their_src, GST_FORMAT_TIME, &duration)) - { - stream->duration = duration / 100; - break; - } - - if (stream->eos) - { - stream->duration = 0; - GST_WARNING("Failed to query duration.\n"); - break; - } - - /* Elements based on GstBaseParse send duration-changed before - * actually updating the duration in GStreamer versions prior - * to 1.17.1. See . So after - * receiving duration-changed we have to continue polling until - * the query succeeds. */ - if (parser->has_duration) - { - pthread_mutex_unlock(&parser->mutex); - g_usleep(10000); - pthread_mutex_lock(&parser->mutex); - } - else - { - pthread_cond_wait(&parser->state_cond, &parser->mutex); - } - } - } - - pthread_mutex_unlock(&parser->mutex); - - parser->sink_connected = true; - return S_OK; -} - static void free_stream(struct wg_parser_stream *stream) { if (stream->their_src) @@ -1592,57 +1483,148 @@ static BOOL wave_parser_init_gst(struct wg_parser *parser) return TRUE; } -static struct wg_parser *wg_parser_create(void) +static void CDECL wg_parser_destroy(struct wg_parser *parser); + +static HRESULT CDECL wg_parser_create(enum wg_parser_type parser_type, uint64_t file_size, struct wg_parser **parser_out) { struct wg_parser *parser; + unsigned int i; if (!(parser = calloc(1, sizeof(*parser)))) - return NULL; + return E_OUTOFMEMORY; pthread_mutex_init(&parser->mutex, NULL); pthread_cond_init(&parser->state_cond, NULL); pthread_cond_init(&parser->read_cond, NULL); parser->flushing = true; parser->read_request.offset = -1; + parser->file_size = file_size; - GST_DEBUG("Created winegstreamer parser %p.\n", parser); - return parser; -} + *parser_out = parser; -static struct wg_parser * CDECL wg_decodebin_parser_create(void) -{ - struct wg_parser *parser; + if (!parser->bus) + { + parser->bus = gst_bus_new(); + gst_bus_set_sync_handler(parser->bus, bus_handler_cb, parser, NULL); + } - if ((parser = wg_parser_create())) - parser->init_gst = decodebin_parser_init_gst; - return parser; -} + parser->container = gst_bin_new(NULL); + gst_element_set_bus(parser->container, parser->bus); -static struct wg_parser * CDECL wg_avi_parser_create(void) -{ - struct wg_parser *parser; + if (!(parser->appsrc = create_element("appsrc", "base"))) + goto fail; + gst_bin_add(GST_BIN(parser->container), parser->appsrc); - if ((parser = wg_parser_create())) - parser->init_gst = avi_parser_init_gst; - return parser; -} + g_object_set(parser->appsrc, "stream-type", GST_APP_STREAM_TYPE_RANDOM_ACCESS, NULL); + g_object_set(parser->appsrc, "size", parser->file_size, NULL); + g_signal_connect(parser->appsrc, "need-data", G_CALLBACK(src_need_data), parser); + g_signal_connect(parser->appsrc, "seek-data", G_CALLBACK(src_seek_data), parser); -static struct wg_parser * CDECL wg_mpeg_audio_parser_create(void) -{ - struct wg_parser *parser; + parser->next_pull_offset = 0; - if ((parser = wg_parser_create())) - parser->init_gst = mpeg_audio_parser_init_gst; - return parser; -} + switch (parser_type) + { + case WG_DECODEBIN_PARSER: + if (!decodebin_parser_init_gst(parser)) + goto fail; + break; + case WG_AVI_PARSER: + if (!avi_parser_init_gst(parser)) + goto fail; + break; + case WG_MPEG_AUDIO_PARSER: + if (!mpeg_audio_parser_init_gst(parser)) + goto fail; + break; + case WG_WAVE_PARSER: + if (!wave_parser_init_gst(parser)) + goto fail; + break; + default: + assert(0); + } -static struct wg_parser * CDECL wg_wave_parser_create(void) -{ - struct wg_parser *parser; + pthread_mutex_lock(&parser->mutex); - if ((parser = wg_parser_create())) - parser->init_gst = wave_parser_init_gst; - return parser; + for (i = 0; i < parser->stream_count; ++i) + { + struct wg_parser_stream *stream = parser->streams[i]; + gint64 duration; + + while (!stream->has_caps && !parser->error) + pthread_cond_wait(&parser->state_cond, &parser->mutex); + + /* GStreamer doesn't actually provide any guarantees about when duration + * is available, even for seekable streams. It's basically built for + * applications that don't care, e.g. movie players that can display + * a duration once it's available, and update it visually if a better + * estimate is found. This doesn't really match well with DirectShow or + * Media Foundation, which both expect duration to be available + * immediately on connecting, so we have to use some complex heuristics + * to try to actually get a usable duration. + * + * Some elements (avidemux, wavparse, qtdemux) record duration almost + * immediately, before fixing caps. Such elements don't send + * duration-changed messages. Therefore always try querying duration + * after caps have been found. + * + * Some elements (mpegaudioparse) send duration-changed. In the case of + * a mp3 stream without seek tables it will not be sent immediately, but + * only after enough frames have been parsed to form an estimate. They + * may send it multiple times with increasingly accurate estimates, but + * unfortunately we have no way of knowing whether another estimate will + * be sent, so we always take the first one. We assume that if the + * duration is not immediately available then the element will always + * send duration-changed. + */ + + for (;;) + { + if (parser->error) + { + pthread_mutex_unlock(&parser->mutex); + goto fail; + } + if (gst_pad_query_duration(stream->their_src, GST_FORMAT_TIME, &duration)) + { + stream->duration = duration / 100; + break; + } + + if (stream->eos) + { + stream->duration = 0; + GST_WARNING("Failed to query duration.\n"); + break; + } + + /* Elements based on GstBaseParse send duration-changed before + * actually updating the duration in GStreamer versions prior + * to 1.17.1. See . So after + * receiving duration-changed we have to continue polling until + * the query succeeds. */ + if (parser->has_duration) + { + pthread_mutex_unlock(&parser->mutex); + g_usleep(10000); + pthread_mutex_lock(&parser->mutex); + } + else + { + pthread_cond_wait(&parser->state_cond, &parser->mutex); + } + } + } + + pthread_mutex_unlock(&parser->mutex); + + GST_DEBUG("Created winegstreamer parser %p.\n", parser); + return S_OK; + + fail: + wg_parser_destroy(parser); + *parser_out = NULL; + return E_FAIL; } static void CDECL wg_parser_destroy(struct wg_parser *parser) @@ -1656,30 +1638,27 @@ static void CDECL wg_parser_destroy(struct wg_parser *parser) pthread_cond_wait(&parser->state_cond, &parser->mutex); pthread_mutex_unlock(&parser->mutex); - if (parser->sink_connected) + /* Unblock all of our streams. */ + pthread_mutex_lock(&parser->mutex); + for (i = 0; i < parser->stream_count; ++i) { - /* Unblock all of our streams. */ - pthread_mutex_lock(&parser->mutex); - for (i = 0; i < parser->stream_count; ++i) - { - parser->streams[i]->flushing = true; - pthread_cond_signal(&parser->streams[i]->event_empty_cond); - } - pthread_mutex_unlock(&parser->mutex); + parser->streams[i]->flushing = true; + pthread_cond_signal(&parser->streams[i]->event_empty_cond); + } + pthread_mutex_unlock(&parser->mutex); - gst_element_set_state(parser->container, GST_STATE_NULL); + gst_element_set_state(parser->container, GST_STATE_NULL); - for (i = 0; i < parser->stream_count; ++i) - free_stream(parser->streams[i]); + for (i = 0; i < parser->stream_count; ++i) + free_stream(parser->streams[i]); - parser->stream_count = 0; - free(parser->streams); - parser->streams = NULL; + parser->stream_count = 0; + free(parser->streams); + parser->streams = NULL; - gst_element_set_bus(parser->container, NULL); - gst_object_unref(parser->container); - parser->container = NULL; - } + gst_element_set_bus(parser->container, NULL); + gst_object_unref(parser->container); + parser->container = NULL; if (parser->bus) { @@ -1696,14 +1675,9 @@ static void CDECL wg_parser_destroy(struct wg_parser *parser) static const struct unix_funcs funcs = { - wg_decodebin_parser_create, - wg_avi_parser_create, - wg_mpeg_audio_parser_create, - wg_wave_parser_create, + wg_parser_create, wg_parser_destroy, - wg_parser_connect, - wg_parser_begin_flush, wg_parser_end_flush, -- 2.33.0