将C回调API转换为Stream


Playground link

我已经通过异步C api访问到了pulseaudio。

pub fn get_sink_info_by_index<F>(
    &self,
    index: u32,
    callback: F
) -> Operation<dyn FnMut(ListResult<&SinkInfo>)> where
    F: FnMut(ListResult<&SinkInfo>) + 'static,

来源:https://docs.rs/libpulse-binding/2.16.1/libpulse_binding/context/introspect/struct.Introspector.html#method.get_sink_info_by_index

我已经实现了一个helper函数(它编译得很好),它生成一个回调来传递给C函数和输出流。

pub fn callbackStream<T:Sized + Clone>() -> (Box<dyn FnMut(ListResult<&'static T>)>, Box<dyn Stream<Item=T>>) {
    let (sender, recv) = channel(1024); // TODO channel size?
    let cb  = {|c: ListResult<&'static T>| match c {
        Item(it) => match sender.try_send(it.clone()) {
            Ok(_) => (),
            Err(err) => () // TODO
        },
        End => (),
        Error => () // TODO
    }};
    (Box::new(cb), Box::new(recv))
}

然而,当我尝试使用它时,我无法让生命周期工作:

    fn stream_sink_info_by_index(
        &self,
        index: u32
    ) -> Box<dyn Stream<Item=SinkInfo>> {
        let (callback, stream): (Box<dyn FnMut(ListResult<&SinkInfo>) + 'static>, Box<dyn Stream<Item=SinkInfo>>) = callbackStream();
        self.get_sink_info_by_index(index, callback);
        stream
    }

给了我

error[E0308]: mismatched types
  --> src/futuristicPulse.rs:31:117
   |
31 |         let (callback, stream): (Box<dyn FnMut(ListResult<&SinkInfo>) + 'static>, Box<dyn Stream<Item=SinkInfo>>) = callbackStream();
   |                                 ---------------------------------------------------------------------------------   ^^^^^^^^^^^^^^^^ expected concrete lifetime, found bound lifetime parameter
   |                                 |
   |                                 expected due to this
   |
   = note: expected tuple `(std::boxed::Box<(dyn for<'r, 's> std::ops::FnMut(pulse::callbacks::ListResult<&'r pulse::context::introspect::SinkInfo<'s>>) + 'static)>, std::boxed::Box<dyn futures_core::stream::Stream<Item = pulse::context::introspect::SinkInfo<'_>>>)`
              found tuple `(std::boxed::Box<(dyn std::ops::FnMut(pulse::callbacks::ListResult<&'static _>) + 'static)>, std::boxed::Box<(dyn futures_core::stream::Stream<Item = _> + 'static)>)`

error: aborting due to previous error; 10 warnings emitted

如果我将(callback, stream)上的类型批注调整为类型检查器找到的类型,则会将错误转发给get_sink_info_by_index调用

error[E0271]: type mismatch resolving `for<'r, 's> <std::boxed::Box<dyn std::ops::FnMut(pulse::callbacks::ListResult<&'static pulse::context::introspect::SinkInfo<'_>>)> as std::ops::FnOnce<(pulse::callbacks::ListResult<&'r pulse::context::introspect::SinkInfo<'s>>,)>>::Output == ()`
  --> src/futuristicPulse.rs:32:14
   |
32 |         self.get_sink_info_by_index(index, callback);
   |              ^^^^^^^^^^^^^^^^^^^^^^ expected bound lifetime parameter, found concrete lifetime

error: aborting due to previous error; 10 warnings emitted

转载请注明出处:http://www.fortunesungroup.com/article/20230330/1297008.html