From: Zebediah Figura Subject: [PATCH] Revert "winegstreamer: Replace source pad interface with GstAppSrc." Message-Id: <20210924041028.2058037-1-zfigura@codeweavers.com> Date: Thu, 23 Sep 2021 23:10:28 -0500 This reverts commit 1aa359a100bae859b278007e8bf90673eebd7db0. appsrc suffers from a rather problematic race condition surrounding flushes [1]. Essentially, it's possible for a flushing seek to begin and end between wg_parser_get_next_read_request and wg_parser_push_data. The race condition is not easy to fix, and in light of it it's not clear if we want to use appsrc. [1] https://www.winehq.org/pipermail/wine-devel/2021-September/196043.html Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=51774 Signed-off-by: Zebediah Figura --- Since the bug won't be fixed before 6.18, and since the way forward may not use appsrc anyway, revert the patch for now. dlls/winegstreamer/wg_parser.c | 429 ++++++++++++++++++++++++--------- 1 file changed, 310 insertions(+), 119 deletions(-) diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 6b6b033b879..f0815e37689 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -33,7 +33,6 @@ #include #include #include -#include /* GStreamer callbacks may be called on threads not created by Wine, and * therefore cannot access the Wine TEB. This means that we must use GStreamer @@ -50,20 +49,28 @@ struct wg_parser struct wg_parser_stream **streams; unsigned int stream_count; - GstElement *container, *appsrc, *decodebin; + GstElement *container, *decodebin; GstBus *bus; + GstPad *my_src, *their_sink; + + guint64 file_size, start_offset, next_offset, stop_offset; + guint64 next_pull_offset; + + pthread_t push_thread; pthread_mutex_t mutex; pthread_cond_t init_cond; bool no_more_pads, has_duration, error; - pthread_cond_t read_cond; + pthread_cond_t read_cond, read_done_cond; struct { + void *data; uint64_t offset; uint32_t size; - bool pending; + bool done; + bool ret; } read_request; bool flushing, sink_connected; @@ -515,7 +522,7 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, { pthread_mutex_lock(&parser->mutex); - while (parser->sink_connected && !parser->read_request.pending) + while (parser->sink_connected && !parser->read_request.data) pthread_cond_wait(&parser->read_cond, &parser->mutex); if (!parser->sink_connected) @@ -534,69 +541,15 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, static void CDECL wg_parser_push_data(struct wg_parser *parser, const void *data, uint32_t size) { - GstMessage *message; - GstFlowReturn ret; - GstBuffer *buffer; - GError *error; - - if (!data) - { - pthread_mutex_lock(&parser->mutex); - - if (parser->sink_connected) - { - error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->read_request.offset); - message = gst_message_new_error(NULL, error, ""); - gst_bus_post(parser->bus, message); - parser->read_request.pending = false; - } - - pthread_mutex_unlock(&parser->mutex); - return; - } - - if (!size) - { - pthread_mutex_lock(&parser->mutex); - - if (parser->sink_connected) - g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret); - parser->read_request.pending = false; - - pthread_mutex_unlock(&parser->mutex); - return; - } - - /* We will always perform an extra blit here. We can avoid this in some - * cases by wrapping a client-allocated buffer using - * gst_buffer_new_wrapped(). However, releasing the memory is non-trivial, - * since GStreamer will hold onto a reference for an arbitrarily long - * period of time. Until there's evidence to suggest that the blit causes a - * performance problem, leave it alone. */ - buffer = gst_buffer_new_and_alloc(size); - gst_buffer_fill(buffer, 0, data, size); - pthread_mutex_lock(&parser->mutex); - - if (!parser->sink_connected) - { - pthread_mutex_unlock(&parser->mutex); - gst_buffer_unref(buffer); - return; - } - - assert(parser->read_request.pending); - - GST_BUFFER_OFFSET(buffer) = parser->read_request.offset; - g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer", buffer, &ret); - - /* In random-access mode, GST_FLOW_EOS shouldn't be returned. */ - assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING); - if (ret == GST_FLOW_OK) - parser->read_request.offset += size; - - parser->read_request.pending = false; + parser->read_request.size = size; + parser->read_request.done = true; + parser->read_request.ret = !!data; + if (data) + memcpy(parser->read_request.data, data, size); + parser->read_request.data = NULL; pthread_mutex_unlock(&parser->mutex); + pthread_cond_signal(&parser->read_done_cond); } static void CDECL wg_parser_set_unlimited_buffering(struct wg_parser *parser) @@ -1251,56 +1204,196 @@ static void pad_removed_cb(GstElement *element, GstPad *pad, gpointer user) g_free(name); } -static void src_need_data(GstElement *appsrc, guint length, gpointer user) +static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent, + guint64 offset, guint size, GstBuffer **buffer) { - struct wg_parser *parser = user; - guint64 queued_bytes; + struct wg_parser *parser = gst_pad_get_element_private(pad); + GstBuffer *new_buffer = NULL; + GstMapInfo map_info; + bool ret; + + GST_LOG("pad %p, offset %" G_GINT64_MODIFIER "u, length %u, buffer %p.", pad, offset, size, *buffer); + + if (offset == GST_BUFFER_OFFSET_NONE) + offset = parser->next_pull_offset; + parser->next_pull_offset = offset + size; + + if (!*buffer) + *buffer = new_buffer = gst_buffer_new_and_alloc(size); + + gst_buffer_map(*buffer, &map_info, GST_MAP_WRITE); pthread_mutex_lock(&parser->mutex); - /* As of GStreamer 1.18, appsrc suffers from a race condition. When in - * random access mode (and when underlyingly in pull mode), appsrc may - * spuriously send multiple requests for the same offset and length. If it - * receives the same buffer twice (or consecutive buffers), it will blindly - * queue them and satisfy subsequent getrange requests from downstream - * elements with the wrong buffers. - * - * Internally, this function is called inside of a loop, which also pops - * buffers from the internal queue. Accordingly we can safely treat this - * request as spurious by checking if we have already sent data; since the - * data is consumed by this thread it will not have been consumed yet. - * - * The bug is documented in greater detail here: - * - * https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/937 - */ - g_object_get(G_OBJECT(appsrc), "current-level-bytes", &queued_bytes, NULL); - if (queued_bytes) - { - pthread_mutex_unlock(&parser->mutex); - return; - } - - parser->read_request.pending = true; - parser->read_request.size = length; - + assert(!parser->read_request.data); + parser->read_request.data = map_info.data; + parser->read_request.offset = offset; + parser->read_request.size = size; + parser->read_request.done = false; pthread_cond_signal(&parser->read_cond); + /* Note that we don't unblock this wait on GST_EVENT_FLUSH_START. We expect + * the upstream pin to flush if necessary. We should never be blocked on + * read_thread() not running. */ + + while (!parser->read_request.done) + pthread_cond_wait(&parser->read_done_cond, &parser->mutex); + + ret = parser->read_request.ret; + gst_buffer_set_size(*buffer, parser->read_request.size); + pthread_mutex_unlock(&parser->mutex); + + gst_buffer_unmap(*buffer, &map_info); + + GST_LOG("Request returned %d.", ret); + + if ((!ret || !size) && new_buffer) + gst_buffer_unref(new_buffer); + + if (ret) + return size ? GST_FLOW_OK : GST_FLOW_EOS; + return GST_FLOW_ERROR; } -static gboolean src_seek_data(GstElement *appsrc, guint64 offset, gpointer user) +static gboolean src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query) { - struct wg_parser *parser = user; + struct wg_parser *parser = gst_pad_get_element_private(pad); + GstFormat format; - pthread_mutex_lock(&parser->mutex); + GST_LOG("parser %p, type %s.", parser, GST_QUERY_TYPE_NAME(query)); - assert(!parser->read_request.pending); - parser->read_request.offset = offset; + switch (GST_QUERY_TYPE(query)) + { + case GST_QUERY_DURATION: + gst_query_parse_duration(query, &format, NULL); + if (format == GST_FORMAT_PERCENT) + { + gst_query_set_duration(query, GST_FORMAT_PERCENT, GST_FORMAT_PERCENT_MAX); + return TRUE; + } + else if (format == GST_FORMAT_BYTES) + { + gst_query_set_duration(query, GST_FORMAT_BYTES, parser->file_size); + return TRUE; + } + return FALSE; - pthread_mutex_unlock(&parser->mutex); + case GST_QUERY_SEEKING: + gst_query_parse_seeking (query, &format, NULL, NULL, NULL); + if (format != GST_FORMAT_BYTES) + { + GST_WARNING("Cannot seek using format \"%s\".", gst_format_get_name(format)); + return FALSE; + } + gst_query_set_seeking(query, GST_FORMAT_BYTES, 1, 0, parser->file_size); + return TRUE; - return true; + case GST_QUERY_SCHEDULING: + gst_query_set_scheduling(query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0); + gst_query_add_scheduling_mode(query, GST_PAD_MODE_PUSH); + gst_query_add_scheduling_mode(query, GST_PAD_MODE_PULL); + return TRUE; + + default: + GST_WARNING("Unhandled query type %s.", GST_QUERY_TYPE_NAME(query)); + return FALSE; + } +} + +static void *push_data(void *arg) +{ + struct wg_parser *parser = arg; + GstBuffer *buffer; + guint max_size; + + GST_DEBUG("Starting push thread."); + + if (!(buffer = gst_buffer_new_allocate(NULL, 16384, NULL))) + { + GST_ERROR("Failed to allocate memory."); + return NULL; + } + + max_size = parser->stop_offset ? parser->stop_offset : parser->file_size; + + for (;;) + { + ULONG size; + int ret; + + if (parser->next_offset >= max_size) + break; + size = min(16384, max_size - parser->next_offset); + + if ((ret = src_getrange_cb(parser->my_src, NULL, parser->next_offset, size, &buffer)) < 0) + { + GST_ERROR("Failed to read data, ret %s.", gst_flow_get_name(ret)); + break; + } + + parser->next_offset += size; + + buffer->duration = buffer->pts = -1; + if ((ret = gst_pad_push(parser->my_src, buffer)) < 0) + { + GST_ERROR("Failed to push data, ret %s.", gst_flow_get_name(ret)); + break; + } + } + + gst_buffer_unref(buffer); + + gst_pad_push_event(parser->my_src, gst_event_new_eos()); + + GST_DEBUG("Stopping push thread."); + + return NULL; +} + +static gboolean activate_push(GstPad *pad, gboolean activate) +{ + struct wg_parser *parser = gst_pad_get_element_private(pad); + + if (!activate) + { + if (parser->push_thread) + { + pthread_join(parser->push_thread, NULL); + parser->push_thread = 0; + } + } + else if (!parser->push_thread) + { + int ret; + + if ((ret = pthread_create(&parser->push_thread, NULL, push_data, parser))) + { + GST_ERROR("Failed to create push thread: %s", strerror(errno)); + parser->push_thread = 0; + return FALSE; + } + } + return TRUE; +} + +static gboolean src_activate_mode_cb(GstPad *pad, GstObject *parent, GstPadMode mode, gboolean activate) +{ + struct wg_parser *parser = gst_pad_get_element_private(pad); + + GST_DEBUG("%s source pad for parser %p in %s mode.", + activate ? "Activating" : "Deactivating", parser, gst_pad_mode_get_name(mode)); + + switch (mode) + { + case GST_PAD_MODE_PULL: + return TRUE; + case GST_PAD_MODE_PUSH: + return activate_push(pad, activate); + case GST_PAD_MODE_NONE: + break; + } + return FALSE; } static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer user) @@ -1347,11 +1440,89 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use return GST_BUS_DROP; } +static gboolean src_perform_seek(struct wg_parser *parser, GstEvent *event) +{ + BOOL thread = !!parser->push_thread; + GstSeekType cur_type, stop_type; + GstFormat seek_format; + GstEvent *flush_event; + GstSeekFlags flags; + gint64 cur, stop; + guint32 seqnum; + gdouble rate; + + gst_event_parse_seek(event, &rate, &seek_format, &flags, + &cur_type, &cur, &stop_type, &stop); + + if (seek_format != GST_FORMAT_BYTES) + { + GST_FIXME("Unhandled format \"%s\".", gst_format_get_name(seek_format)); + return FALSE; + } + + seqnum = gst_event_get_seqnum(event); + + /* send flush start */ + if (flags & GST_SEEK_FLAG_FLUSH) + { + flush_event = gst_event_new_flush_start(); + gst_event_set_seqnum(flush_event, seqnum); + gst_pad_push_event(parser->my_src, flush_event); + if (thread) + gst_pad_set_active(parser->my_src, 1); + } + + parser->next_offset = parser->start_offset = cur; + + /* and prepare to continue streaming */ + if (flags & GST_SEEK_FLAG_FLUSH) + { + flush_event = gst_event_new_flush_stop(TRUE); + gst_event_set_seqnum(flush_event, seqnum); + gst_pad_push_event(parser->my_src, flush_event); + if (thread) + gst_pad_set_active(parser->my_src, 1); + } + + return TRUE; +} + +static gboolean src_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) +{ + struct wg_parser *parser = gst_pad_get_element_private(pad); + gboolean ret = TRUE; + + GST_LOG("parser %p, type \"%s\".", parser, GST_EVENT_TYPE_NAME(event)); + + switch (event->type) + { + case GST_EVENT_SEEK: + ret = src_perform_seek(parser, event); + break; + + case GST_EVENT_FLUSH_START: + case GST_EVENT_FLUSH_STOP: + case GST_EVENT_QOS: + case GST_EVENT_RECONFIGURE: + break; + + default: + GST_WARNING("Ignoring \"%s\" event.", GST_EVENT_TYPE_NAME(event)); + ret = FALSE; + break; + } + gst_event_unref(event); + return ret; +} + static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_size) { + GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE("quartz_src", + GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); unsigned int i; int ret; + parser->file_size = file_size; parser->sink_connected = true; if (!parser->bus) @@ -1363,16 +1534,15 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s parser->container = gst_bin_new(NULL); gst_element_set_bus(parser->container, parser->bus); - if (!(parser->appsrc = create_element("appsrc", "base"))) - return E_FAIL; - gst_bin_add(GST_BIN(parser->container), parser->appsrc); + parser->my_src = gst_pad_new_from_static_template(&src_template, "quartz-src"); + gst_pad_set_getrange_function(parser->my_src, src_getrange_cb); + gst_pad_set_query_function(parser->my_src, src_query_cb); + gst_pad_set_activatemode_function(parser->my_src, src_activate_mode_cb); + gst_pad_set_event_function(parser->my_src, src_event_cb); + gst_pad_set_element_private(parser->my_src, parser); - g_object_set(parser->appsrc, "stream-type", GST_APP_STREAM_TYPE_RANDOM_ACCESS, NULL); - g_object_set(parser->appsrc, "size", file_size, NULL); - g_signal_connect(parser->appsrc, "need-data", G_CALLBACK(src_need_data), parser); - g_signal_connect(parser->appsrc, "seek-data", G_CALLBACK(src_seek_data), parser); - - parser->read_request.offset = 0; + parser->start_offset = parser->next_offset = parser->stop_offset = 0; + parser->next_pull_offset = 0; parser->error = false; if (!parser->init_gst(parser)) @@ -1468,11 +1638,18 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s pthread_mutex_unlock(&parser->mutex); + parser->next_offset = 0; return S_OK; out: if (parser->container) gst_element_set_state(parser->container, GST_STATE_NULL); + if (parser->their_sink) + { + gst_pad_unlink(parser->my_src, parser->their_sink); + gst_object_unref(parser->their_sink); + parser->my_src = parser->their_sink = NULL; + } for (i = 0; i < parser->stream_count; ++i) free_stream(parser->streams[i]); @@ -1509,6 +1686,10 @@ static void CDECL wg_parser_disconnect(struct wg_parser *parser) pthread_mutex_unlock(&parser->mutex); gst_element_set_state(parser->container, GST_STATE_NULL); + gst_pad_unlink(parser->my_src, parser->their_sink); + gst_object_unref(parser->my_src); + gst_object_unref(parser->their_sink); + parser->my_src = parser->their_sink = NULL; pthread_mutex_lock(&parser->mutex); parser->sink_connected = false; @@ -1530,6 +1711,7 @@ static void CDECL wg_parser_disconnect(struct wg_parser *parser) static BOOL decodebin_parser_init_gst(struct wg_parser *parser) { GstElement *element; + int ret; if (!(element = create_element("decodebin", "base"))) return FALSE; @@ -1542,13 +1724,15 @@ static BOOL decodebin_parser_init_gst(struct wg_parser *parser) g_signal_connect(element, "autoplug-select", G_CALLBACK(autoplug_select_cb), parser); g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_cb), parser); + parser->their_sink = gst_element_get_static_pad(element, "sink"); + pthread_mutex_lock(&parser->mutex); parser->no_more_pads = false; pthread_mutex_unlock(&parser->mutex); - if (!gst_element_link(parser->appsrc, parser->decodebin)) + if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) { - GST_ERROR("Failed to link appsrc.\n"); + GST_ERROR("Failed to link pads, error %d.\n", ret); return FALSE; } @@ -1558,6 +1742,7 @@ static BOOL decodebin_parser_init_gst(struct wg_parser *parser) static BOOL avi_parser_init_gst(struct wg_parser *parser) { GstElement *element; + int ret; if (!(element = create_element("avidemux", "good"))) return FALSE; @@ -1568,13 +1753,15 @@ static BOOL avi_parser_init_gst(struct wg_parser *parser) g_signal_connect(element, "pad-removed", G_CALLBACK(pad_removed_cb), parser); g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_cb), parser); + parser->their_sink = gst_element_get_static_pad(element, "sink"); + pthread_mutex_lock(&parser->mutex); parser->no_more_pads = false; pthread_mutex_unlock(&parser->mutex); - if (!gst_element_link(parser->appsrc, element)) + if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) { - GST_ERROR("Failed to link appsrc.\n"); + GST_ERROR("Failed to link pads, error %d.\n", ret); return FALSE; } @@ -1592,9 +1779,10 @@ static BOOL mpeg_audio_parser_init_gst(struct wg_parser *parser) gst_bin_add(GST_BIN(parser->container), element); - if (!gst_element_link(parser->appsrc, element)) + parser->their_sink = gst_element_get_static_pad(element, "sink"); + if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) { - GST_ERROR("Failed to link appsrc.\n"); + GST_ERROR("Failed to link sink pads, error %d.\n", ret); return FALSE; } @@ -1625,9 +1813,10 @@ static BOOL wave_parser_init_gst(struct wg_parser *parser) gst_bin_add(GST_BIN(parser->container), element); - if (!gst_element_link(parser->appsrc, element)) + parser->their_sink = gst_element_get_static_pad(element, "sink"); + if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) { - GST_ERROR("Failed to link appsrc.\n"); + GST_ERROR("Failed to link sink pads, error %d.\n", ret); return FALSE; } @@ -1658,6 +1847,7 @@ static struct wg_parser *wg_parser_create(void) pthread_mutex_init(&parser->mutex, NULL); pthread_cond_init(&parser->init_cond, NULL); pthread_cond_init(&parser->read_cond, NULL); + pthread_cond_init(&parser->read_done_cond, NULL); parser->flushing = true; GST_DEBUG("Created winegstreamer parser %p.\n", parser); @@ -1711,6 +1901,7 @@ static void CDECL wg_parser_destroy(struct wg_parser *parser) pthread_mutex_destroy(&parser->mutex); pthread_cond_destroy(&parser->init_cond); pthread_cond_destroy(&parser->read_cond); + pthread_cond_destroy(&parser->read_done_cond); free(parser); } -- 2.33.0