From: Derek Lesho Subject: Re: [PATCH v1 3/3] mfreadwrite: Implement MF_SOURCE_READER_ASYNC_CALLBACK. Message-Id: <7cb9f43f-2d80-8acc-955e-5f3fa9b2a28f@codeweavers.com> Date: Wed, 18 Mar 2020 13:18:53 -0500 In-Reply-To: <14df2f61-94b4-5511-ecfa-a5f142dc8852@codeweavers.com> References: <20200317164845.245077-1-dlesho@codeweavers.com> <20200317164845.245077-3-dlesho@codeweavers.com> <14df2f61-94b4-5511-ecfa-a5f142dc8852@codeweavers.com> On 2020-03-18 08:37, Nikolay Sivov wrote: > > > On 3/17/20 7:48 PM, Derek Lesho wrote: >> @@ -587,6 +594,8 @@ static ULONG WINAPI >> src_reader_Release(IMFSourceReader *iface) >>                   list_remove(&ptr->entry); >>                   heap_free(ptr); >>               } >> + >> +            MFUnlockWorkQueue(stream->read_samples_queue); >>           } >>           heap_free(reader->streams); >>           DeleteCriticalSection(&reader->cs); > Is additional queue necessary for synchronous case? No, I can remove it in that case. > >> +    if (FAILED(hr = media_stream_get_id(state, &id))) >> +    { >> +        WARN("Bad stream %p, hr %#x.\n", state, hr); >> +    } >> + >> +    for (unsigned int i = 0; i < reader->stream_count; ++i) >> +    { >> +        if (id == reader->streams[i].id) >> +        { >> +            struct media_stream *stream = &reader->streams[i]; >> +            IMFSample *sample = NULL; >> +            DWORD stream_flags; >> +            LONGLONG timestamp = 0; >> + >> +            hr = next_sample(stream, &sample, &stream_flags, FALSE); >> +            if (sample) >> +            { >> +                IMFSample_GetSampleTime(sample, ×tamp); >> +            } >> + >> +            TRACE("Invoking read sample callback %p with (hr = %#x, >> stream_idx = %u, flags = %#x, timestamp %lu, sample %p)\n", >> reader->async_callback, hr, i, stream_flags, timestamp, sample); >> +            hr = >> IMFSourceReaderCallback_OnReadSample(reader->async_callback, hr, i, >> stream_flags, timestamp, sample); >> +            IMFSample_Release(sample); >> +            return hr; >> +        } >> +    } >> + >> +    return S_OK; > We already talked about this part - blocking in next_sample(), even on > dedicated thread, is unnecessary. > > What will happen is MENewSample from stream queue -> wake thread in > dedicated per-stream queue -> process -> call OnReadSample. > Without blocking you'll have MENewSample -> process on same event > handling thread -> call OnReadSample. > > I don't see a need for additional complexity. This solution, in my opinion, reduces complexity, as it allows us to share the same logic between the asynchronous and synchronous paths.  Additionally, what happens if a single input sample produces multiple output samples, do we fall back to the work queue method in that case?  The only difference in the two paths you've laid out is that the work queue approach involves waking up another thread, in the same way that we will have to wake up the thread blocking on ReadSample. > >> +    switch (index) >> +    { >> +        case MF_SOURCE_READER_FIRST_VIDEO_STREAM: >> +            stream_index = reader->first_video_stream_index; >> +            break; >> +        case MF_SOURCE_READER_FIRST_AUDIO_STREAM: >> +            stream_index = reader->first_audio_stream_index; >> +            break; >> +        case MF_SOURCE_READER_ANY_STREAM: >> +            FIXME("Non-specific requests are not supported.\n"); >> +            return E_NOTIMPL; >> +        default: >> +            stream_index = index; >> +    } >> + >> +    /* Can't read from deselected streams. */ >> +    if (FAILED(hr = source_reader_get_stream_selection(reader, >> stream_index, &selected)) && !selected) >> +        return hr; > If that's how it works, this part applies to both sync and async cases. Sorry, typo here. > >> + EnterCriticalSection(&stream->cs); >> +    while(!stream->stream) >> +    { >> +        SleepConditionVariableCS(&stream->sample_event, &stream->cs, >> INFINITE); >> +    } >> +    LeaveCriticalSection(&stream->cs); >> + >> +    TRACE("Dispatching read sample callback for stream %p\n", >> stream->stream); >> +    if (FAILED(hr = MFPutWorkItem(stream->read_samples_queue, >> &reader->read_samples_callback, (IUnknown*)stream->stream))) >> +    { >> +        WARN("Failed to submit item hr = %#x\n", hr); >> +        return E_FAIL; >> +    } > This blocks for indefinite amount of time, for ReadSample method > that's supposed to be non-blocking. It's correct of course to start > the source here, > which also could be potentially shared with sync case, but waiting for > new samples, using 'sample_event' that's not meant for it, is wrong. > > It should start source, and keep a note of number of times each stream > was requested, later on MENewStream/MEUpdatedStream you can issue that > many requests at once. Ack.