From: Derek Lesho Subject: [PATCH v2 4/5] winegstreamer: Use decodebin to initialize media streams. Message-Id: <20200826185912.749994-4-dlesho@codeweavers.com> Date: Wed, 26 Aug 2020 13:59:11 -0500 In-Reply-To: <20200826185912.749994-1-dlesho@codeweavers.com> References: <20200826185912.749994-1-dlesho@codeweavers.com> Signed-off-by: Derek Lesho --- dlls/winegstreamer/gst_cbs.c | 47 +++- dlls/winegstreamer/gst_cbs.h | 8 + dlls/winegstreamer/media_source.c | 418 +++++++++++++++++++++++++++++- 3 files changed, 471 insertions(+), 2 deletions(-) diff --git a/dlls/winegstreamer/gst_cbs.c b/dlls/winegstreamer/gst_cbs.c index dfe33dd6277..4755f5b42f1 100644 --- a/dlls/winegstreamer/gst_cbs.c +++ b/dlls/winegstreamer/gst_cbs.c @@ -358,4 +358,49 @@ gboolean process_bytestream_pad_event_wrapper(GstPad *pad, GstObject *parent, Gs call_cb(&cbdata); return cbdata.u.event_src_data.ret; -} \ No newline at end of file +} + +GstBusSyncReply watch_source_bus_wrapper(GstBus *bus, GstMessage *message, gpointer user) +{ + struct cb_data cbdata = { WATCH_SOURCE_BUS }; + + cbdata.u.watch_bus_data.bus = bus; + cbdata.u.watch_bus_data.msg = message; + cbdata.u.watch_bus_data.user = user; + + call_cb(&cbdata); + + return cbdata.u.watch_bus_data.ret; +} + +void source_stream_added_wrapper(GstElement *bin, GstPad *pad, gpointer user) +{ + struct cb_data cbdata = { SOURCE_STREAM_ADDED }; + + cbdata.u.pad_added_data.element = bin; + cbdata.u.pad_added_data.pad = pad; + cbdata.u.pad_added_data.user = user; + + call_cb(&cbdata); +} + +void source_stream_removed_wrapper(GstElement *element, GstPad *pad, gpointer user) +{ + struct cb_data cbdata = { SOURCE_STREAM_REMOVED }; + + cbdata.u.pad_removed_data.element = element; + cbdata.u.pad_removed_data.pad = pad; + cbdata.u.pad_removed_data.user = user; + + call_cb(&cbdata); +} + +void source_all_streams_wrapper(GstElement *element, gpointer user) +{ + struct cb_data cbdata = { SOURCE_ALL_STREAMS }; + + cbdata.u.no_more_pads_data.element = element; + cbdata.u.no_more_pads_data.user = user; + + call_cb(&cbdata); +} diff --git a/dlls/winegstreamer/gst_cbs.h b/dlls/winegstreamer/gst_cbs.h index 10e999feea7..d87cc8c21e9 100644 --- a/dlls/winegstreamer/gst_cbs.h +++ b/dlls/winegstreamer/gst_cbs.h @@ -48,6 +48,10 @@ enum CB_TYPE { QUERY_BYTESTREAM, ACTIVATE_BYTESTREAM_PAD_MODE, PROCESS_BYTESTREAM_PAD_EVENT, + WATCH_SOURCE_BUS, + SOURCE_STREAM_ADDED, + SOURCE_STREAM_REMOVED, + SOURCE_ALL_STREAMS, MEDIA_SOURCE_MAX, }; @@ -164,5 +168,9 @@ GstFlowReturn pull_from_bytestream_wrapper(GstPad *pad, GstObject *parent, guint gboolean query_bytestream_wrapper(GstPad *pad, GstObject *parent, GstQuery *query) DECLSPEC_HIDDEN; gboolean activate_bytestream_pad_mode_wrapper(GstPad *pad, GstObject *parent, GstPadMode mode, gboolean activate) DECLSPEC_HIDDEN; gboolean process_bytestream_pad_event_wrapper(GstPad *pad, GstObject *parent, GstEvent *event) DECLSPEC_HIDDEN; +GstBusSyncReply watch_source_bus_wrapper(GstBus *bus, GstMessage *message, gpointer user) DECLSPEC_HIDDEN; +void source_stream_added_wrapper(GstElement *bin, GstPad *pad, gpointer user) DECLSPEC_HIDDEN; +void source_stream_removed_wrapper(GstElement *element, GstPad *pad, gpointer user) DECLSPEC_HIDDEN; +void source_all_streams_wrapper(GstElement *element, gpointer user) DECLSPEC_HIDDEN; #endif diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index ba3de001738..fa0e1065ea7 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -5,6 +5,7 @@ #include "gst_private.h" #include "gst_cbs.h" +#include #include #define COBJMACROS @@ -31,6 +32,23 @@ static struct source_desc } }; +struct media_stream +{ + IMFMediaStream IMFMediaStream_iface; + LONG ref; + struct media_source *parent_source; + IMFMediaEventQueue *event_queue; + GstElement *appsink; + GstPad *their_src, *my_sink; + /* usually reflects state of source */ + enum + { + STREAM_STUB, + STREAM_INACTIVE, + STREAM_SHUTDOWN, + } state; +}; + struct media_source { IMFMediaSource IMFMediaSource_iface; @@ -38,15 +56,26 @@ struct media_source enum source_type type; IMFMediaEventQueue *event_queue; IMFByteStream *byte_stream; - GstPad *my_src; + struct media_stream **streams; + ULONG stream_count; + GstBus *bus; + GstElement *container; + GstElement *decodebin; + GstPad *my_src, *their_sink; enum { SOURCE_OPENING, SOURCE_STOPPED, SOURCE_SHUTDOWN, } state; + HANDLE all_streams_event; }; +static inline struct media_stream *impl_from_IMFMediaStream(IMFMediaStream *iface) +{ + return CONTAINING_RECORD(iface, struct media_stream, IMFMediaStream_iface); +} + static inline struct media_source *impl_from_IMFMediaSource(IMFMediaSource *iface) { return CONTAINING_RECORD(iface, struct media_source, IMFMediaSource_iface); @@ -200,6 +229,243 @@ static gboolean process_bytestream_pad_event(GstPad *pad, GstObject *parent, Gst return TRUE; } +GstBusSyncReply watch_source_bus(GstBus *bus, GstMessage *message, gpointer user) +{ + struct media_source *source = (struct media_source *) user; + gchar *dbg_info = NULL; + GError *err = NULL; + + TRACE("source %p message type %s\n", source, GST_MESSAGE_TYPE_NAME(message)); + + switch (message->type) + { + case GST_MESSAGE_ERROR: + gst_message_parse_error(message, &err, &dbg_info); + ERR("%s: %s\n", GST_OBJECT_NAME(message->src), err->message); + ERR("%s\n", dbg_info); + g_error_free(err); + g_free(dbg_info); + break; + case GST_MESSAGE_WARNING: + gst_message_parse_warning(message, &err, &dbg_info); + WARN("%s: %s\n", GST_OBJECT_NAME(message->src), err->message); + WARN("%s\n", dbg_info); + g_error_free(err); + g_free(dbg_info); + break; + default: + break; + } + + return GST_BUS_PASS; +} + +static HRESULT WINAPI media_stream_QueryInterface(IMFMediaStream *iface, REFIID riid, void **out) +{ + struct media_stream *stream = impl_from_IMFMediaStream(iface); + + TRACE("(%p)->(%s %p)\n", stream, debugstr_guid(riid), out); + + if (IsEqualIID(riid, &IID_IMFMediaStream) || + IsEqualIID(riid, &IID_IMFMediaEventGenerator) || + IsEqualIID(riid, &IID_IUnknown)) + { + *out = &stream->IMFMediaStream_iface; + } + else + { + FIXME("(%s, %p)\n", debugstr_guid(riid), out); + *out = NULL; + return E_NOINTERFACE; + } + + IUnknown_AddRef((IUnknown*)*out); + return S_OK; +} + +static ULONG WINAPI media_stream_AddRef(IMFMediaStream *iface) +{ + struct media_stream *stream = impl_from_IMFMediaStream(iface); + ULONG ref = InterlockedIncrement(&stream->ref); + + TRACE("(%p) ref=%u\n", stream, ref); + + return ref; +} + +static ULONG WINAPI media_stream_Release(IMFMediaStream *iface) +{ + struct media_stream *stream = impl_from_IMFMediaStream(iface); + + ULONG ref = InterlockedDecrement(&stream->ref); + + TRACE("(%p) ref=%u\n", stream, ref); + + if (!ref) + { + if (stream->my_sink) + gst_object_unref(GST_OBJECT(stream->my_sink)); + if (stream->event_queue) + IMFMediaEventQueue_Release(stream->event_queue); + if (stream->parent_source) + IMFMediaSource_Release(&stream->parent_source->IMFMediaSource_iface); + + heap_free(stream); + } + + return ref; +} + +static HRESULT WINAPI media_stream_GetEvent(IMFMediaStream *iface, DWORD flags, IMFMediaEvent **event) +{ + struct media_stream *stream = impl_from_IMFMediaStream(iface); + + TRACE("(%p)->(%#x, %p)\n", stream, flags, event); + + if (stream->state == STREAM_SHUTDOWN) + return MF_E_SHUTDOWN; + + return IMFMediaEventQueue_GetEvent(stream->event_queue, flags, event); +} + +static HRESULT WINAPI media_stream_BeginGetEvent(IMFMediaStream *iface, IMFAsyncCallback *callback, IUnknown *state) +{ + struct media_stream *stream = impl_from_IMFMediaStream(iface); + + TRACE("(%p)->(%p, %p)\n", stream, callback, state); + + if (stream->state == STREAM_SHUTDOWN) + return MF_E_SHUTDOWN; + + return IMFMediaEventQueue_BeginGetEvent(stream->event_queue, callback, state); +} + +static HRESULT WINAPI media_stream_EndGetEvent(IMFMediaStream *iface, IMFAsyncResult *result, IMFMediaEvent **event) +{ + struct media_stream *stream = impl_from_IMFMediaStream(iface); + + TRACE("(%p)->(%p, %p)\n", stream, result, event); + + if (stream->state == STREAM_SHUTDOWN) + return MF_E_SHUTDOWN; + + return IMFMediaEventQueue_EndGetEvent(stream->event_queue, result, event); +} + +static HRESULT WINAPI media_stream_QueueEvent(IMFMediaStream *iface, MediaEventType event_type, REFGUID ext_type, + HRESULT hr, const PROPVARIANT *value) +{ + struct media_stream *stream = impl_from_IMFMediaStream(iface); + + TRACE("(%p)->(%d, %s, %#x, %p)\n", stream, event_type, debugstr_guid(ext_type), hr, value); + + if (stream->state == STREAM_SHUTDOWN) + return MF_E_SHUTDOWN; + + return IMFMediaEventQueue_QueueEventParamVar(stream->event_queue, event_type, ext_type, hr, value); +} + +static HRESULT WINAPI media_stream_GetMediaSource(IMFMediaStream *iface, IMFMediaSource **source) +{ + struct media_stream *stream = impl_from_IMFMediaStream(iface); + + FIXME("stub (%p)->(%p)\n", stream, source); + + if (stream->state == STREAM_SHUTDOWN) + return MF_E_SHUTDOWN; + + return E_NOTIMPL; +} + +static HRESULT WINAPI media_stream_GetStreamDescriptor(IMFMediaStream* iface, IMFStreamDescriptor **descriptor) +{ + struct media_stream *stream = impl_from_IMFMediaStream(iface); + + TRACE("(%p)->(%p)\n", stream, descriptor); + + if (stream->state == STREAM_SHUTDOWN) + return MF_E_SHUTDOWN; + + return E_NOTIMPL; +} + +static HRESULT WINAPI media_stream_RequestSample(IMFMediaStream *iface, IUnknown *token) +{ + struct media_stream *stream = impl_from_IMFMediaStream(iface); + + TRACE("(%p)->(%p)\n", iface, token); + + if (stream->state == STREAM_SHUTDOWN) + return MF_E_SHUTDOWN; + + return E_NOTIMPL; +} + +static const IMFMediaStreamVtbl media_stream_vtbl = +{ + media_stream_QueryInterface, + media_stream_AddRef, + media_stream_Release, + media_stream_GetEvent, + media_stream_BeginGetEvent, + media_stream_EndGetEvent, + media_stream_QueueEvent, + media_stream_GetMediaSource, + media_stream_GetStreamDescriptor, + media_stream_RequestSample +}; + +/* creates a stub stream */ +static HRESULT new_media_stream(struct media_source *source, GstPad *pad, DWORD stream_id, struct media_stream **out_stream) +{ + struct media_stream *object = heap_alloc_zero(sizeof(*object)); + HRESULT hr; + + TRACE("(%p %p)->(%p)\n", source, pad, out_stream); + + object->IMFMediaStream_iface.lpVtbl = &media_stream_vtbl; + object->ref = 1; + + IMFMediaSource_AddRef(&source->IMFMediaSource_iface); + object->parent_source = source; + object->their_src = pad; + + object->state = STREAM_STUB; + + if (FAILED(hr = MFCreateEventQueue(&object->event_queue))) + goto fail; + + if (!(object->appsink = gst_element_factory_make("appsink", NULL))) + { + hr = E_OUTOFMEMORY; + goto fail; + } + gst_bin_add(GST_BIN(object->parent_source->container), object->appsink); + + g_object_set(object->appsink, "emit-signals", TRUE, NULL); + g_object_set(object->appsink, "sync", FALSE, NULL); + g_object_set(object->appsink, "max-buffers", 5, NULL); + g_object_set(object->appsink, "wait-on-eos", FALSE, NULL); + + object->my_sink = gst_element_get_static_pad(object->appsink, "sink"); + gst_pad_set_element_private(object->my_sink, object); + + gst_pad_link(object->their_src, object->my_sink); + + gst_element_sync_state_with_parent(object->appsink); + + TRACE("->(%p)\n", object); + *out_stream = object; + + return S_OK; + + fail: + WARN("Failed to construct media stream, hr %#x.\n", hr); + + IMFMediaStream_Release(&object->IMFMediaStream_iface); + return hr; +} + static HRESULT WINAPI media_source_QueryInterface(IMFMediaSource *iface, REFIID riid, void **out) { struct media_source *source = impl_from_IMFMediaSource(iface); @@ -360,13 +626,34 @@ static HRESULT WINAPI media_source_Pause(IMFMediaSource *iface) static HRESULT media_source_teardown(struct media_source *source) { + if (source->container) + { + gst_element_set_state(source->container, GST_STATE_NULL); + gst_object_unref(GST_OBJECT(source->container)); + } + if (source->my_src) gst_object_unref(GST_OBJECT(source->my_src)); + if (source->their_sink) + gst_object_unref(GST_OBJECT(source->their_sink)); + if (source->event_queue) IMFMediaEventQueue_Release(source->event_queue); if (source->byte_stream) IMFByteStream_Release(source->byte_stream); + for (unsigned int i = 0; i < source->stream_count; i++) + { + source->streams[i]->state = STREAM_SHUTDOWN; + IMFMediaStream_Release(&source->streams[i]->IMFMediaStream_iface); + } + + if (source->stream_count) + heap_free(source->streams); + + if (source->all_streams_event) + CloseHandle(source->all_streams_event); + return S_OK; } @@ -397,6 +684,63 @@ static const IMFMediaSourceVtbl IMFMediaSource_vtbl = media_source_Shutdown, }; +static void source_stream_added(GstElement *element, GstPad *pad, gpointer user) +{ + struct media_source *source = (struct media_source *) user; + struct media_stream **new_stream_array; + struct media_stream *stream; + gchar *g_stream_id; + DWORD stream_id; + + if (gst_pad_get_direction(pad) != GST_PAD_SRC) + return; + + /* Most/All seen randomly calculate the initial part of the stream id, the last three digits are the only deterministic part */ + g_stream_id = GST_PAD_NAME(pad); + sscanf(strstr(g_stream_id, "_"), "_%u", &stream_id); + + TRACE("stream-id: %u\n", stream_id); + + if (FAILED(new_media_stream(source, pad, stream_id, &stream))) + { + goto leave; + } + + if (!(new_stream_array = heap_realloc(source->streams, (source->stream_count + 1) * (sizeof(*new_stream_array))))) + { + ERR("Failed to add stream to source\n"); + goto leave; + } + + source->streams = new_stream_array; + source->streams[source->stream_count++] = stream; + + leave: + return; +} + +static void source_stream_removed(GstElement *element, GstPad *pad, gpointer user) +{ + struct media_source *source = (struct media_source *)user; + + for (unsigned int i = 0; i < source->stream_count; i++) + { + struct media_stream *stream = source->streams[i]; + if (stream->their_src != pad) + continue; + stream->their_src = NULL; + if (stream->state != STREAM_INACTIVE) + stream->state = STREAM_INACTIVE; + } +} + +static void source_all_streams(GstElement *element, gpointer user) +{ + struct media_source *source = (struct media_source *) user; + + SetEvent(source->all_streams_event); +} + static HRESULT media_source_constructor(IMFByteStream *bytestream, enum source_type type, struct media_source **out_media_source) { GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE( @@ -407,6 +751,7 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, enum source_t struct media_source *object = heap_alloc_zero(sizeof(*object)); HRESULT hr; + int ret; if (!object) return E_OUTOFMEMORY; @@ -416,10 +761,16 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, enum source_t object->type = type; object->byte_stream = bytestream; IMFByteStream_AddRef(bytestream); + object->all_streams_event = CreateEventA(NULL, FALSE, FALSE, NULL); if (FAILED(hr = MFCreateEventQueue(&object->event_queue))) goto fail; + object->container = gst_bin_new(NULL); + object->bus = gst_bus_new(); + gst_bus_set_sync_handler(object->bus, watch_source_bus_wrapper, object, NULL); + gst_element_set_bus(object->container, object->bus); + object->my_src = gst_pad_new_from_static_template(&src_template, "mf-src"); gst_pad_set_element_private(object->my_src, object); gst_pad_set_getrange_function(object->my_src, pull_from_bytestream_wrapper); @@ -427,6 +778,47 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, enum source_t gst_pad_set_activatemode_function(object->my_src, activate_bytestream_pad_mode_wrapper); gst_pad_set_event_function(object->my_src, process_bytestream_pad_event_wrapper); + object->decodebin = gst_element_factory_make("decodebin", NULL); + if (!(object->decodebin)) + { + WARN("Failed to create decodebin for source\n"); + hr = E_OUTOFMEMORY; + goto fail; + } + /* the appsinks determine the maximum amount of buffering instead, this means that if one stream isn't read, a leak will happen, like on windows */ + g_object_set(object->decodebin, "max-size-buffers", 0, NULL); + g_object_set(object->decodebin, "max-size-time", G_GUINT64_CONSTANT(0), NULL); + g_object_set(object->decodebin, "max-size-bytes", 0, NULL); + g_object_set(object->decodebin, "sink-caps", gst_static_caps_get(&source_descs[type].bytestream_caps), NULL); + + gst_bin_add(GST_BIN(object->container), object->decodebin); + + g_signal_connect(object->decodebin, "pad-added", G_CALLBACK(source_stream_added_wrapper), object); + g_signal_connect(object->decodebin, "pad-removed", G_CALLBACK(source_stream_removed_wrapper), object); + g_signal_connect(object->decodebin, "no-more-pads", G_CALLBACK(source_all_streams_wrapper), object); + + object->their_sink = gst_element_get_static_pad(object->decodebin, "sink"); + + if ((ret = gst_pad_link(object->my_src, object->their_sink)) < 0) + { + WARN("Failed to link our bytestream pad to the demuxer input\n"); + hr = E_OUTOFMEMORY; + goto fail; + } + + object->state = SOURCE_OPENING; + + gst_element_set_state(object->container, GST_STATE_PAUSED); + ret = gst_element_get_state(object->container, NULL, NULL, -1); + if (ret == GST_STATE_CHANGE_FAILURE) + { + ERR("Failed to play source.\n"); + hr = E_OUTOFMEMORY; + goto fail; + } + + WaitForSingleObject(object->all_streams_event, INFINITE); + object->state = SOURCE_STOPPED; *out_media_source = object; @@ -933,6 +1325,30 @@ void perform_cb_media_source(struct cb_data *cbdata) cbdata->u.event_src_data.ret = process_bytestream_pad_event(data->pad, data->parent, data->event); break; } + case WATCH_SOURCE_BUS: + { + struct watch_bus_data *data = &cbdata->u.watch_bus_data; + cbdata->u.watch_bus_data.ret = watch_source_bus(data->bus, data->msg, data->user); + break; + } + case SOURCE_STREAM_ADDED: + { + struct pad_added_data *data = &cbdata->u.pad_added_data; + source_stream_added(data->element, data->pad, data->user); + break; + } + case SOURCE_STREAM_REMOVED: + { + struct pad_removed_data *data = &cbdata->u.pad_removed_data; + source_stream_removed(data->element, data->pad, data->user); + break; + } + case SOURCE_ALL_STREAMS: + { + struct no_more_pads_data *data = &cbdata->u.no_more_pads_data; + source_all_streams(data->element, data->user); + break; + } default: { ERR("Wrong callback forwarder called\n"); -- 2.28.0