文章目录

  • 简介
  • Q&A
    • Question 1: 你是选择传递还是共享对象?
    • Question 2: 是否与实时线程交互?
    • Question 3: 共享数据是否足够小?
    • Question 4: 获取共享资源是否允许失败?
    • Question 5:实时线程会修改线程间共享的数据吗?
    • Question 6:非实时线程会修改线程间共享的数据吗?
    • Question 7: 实时线程和非实时线程都会修改数据,这该怎么办?
  • 总结

简介

在 实时音频编程(一)中,我们总结了实时音频编程几条经验法则,先来回顾下它们,在实时线程中,你不能做:

  1. 不要申请或者释放内存
  2. 不要使用锁
  3. 不要进行文件读写,或者其他方式的 I/O(这包括任何 print 或者 NSLog,或者 GUI API)
  4. 不要调用那些可能造成阻塞的系统 api
  5. 不要运行那些执行时间不确定,或者最坏时间复杂度有激增的代码
  6. 不要调用任何有上述行为的代码
  7. 不要调用任何你不信任的代码

在可能的情况下,有几件事你应该做:

  1. 用最坏时间复杂度来衡量算法,选用最坏时间复杂度友好的算法
  2. 在许多音频采样中摊销计算,以平滑CPU的使用,而不是使用偶尔有长处理时间的 "突发 "算法。
  3. 在一个非实时线程中预先分配或预先计算数据
  4. 采用非共享的、仅在音频回调中使用的数据结构,这样你就不需要考虑共享、并发和锁的问题。

接来下,将对 2019 JUCE 开发者大会上分享的 real-time 101 演讲进行总结。Fabian Renn-Giles & Dave Rowland 分享了许多实时编程的实用技巧,我们将通过一系列的问题,来确定不同场景下的解决方案。总结如下图:

Q&A

接下来,你将通过回答一些问题来确定不同场景的解决方案,每种场景都会引入一种工具。

Question 1: 你是选择传递还是共享对象?

让我们进入第一个问题:你是选择传递还是共享对象。如果你选择传递(即复制)对象到另一个线程,那么两个线程之间可以通过 FIFO 来交互。

FIFO(First Input First Output),即先进先出队列。它非常适合用于线程之间的数据传递。在实时编程中,我们常使用 ringbuffer 来实现 FIFO。ringbuffer 有固定的大小,也就意味着没有内存申请,此外它还有很多不同的类型。

就拿最简单的情况来说,即单个生产者单个消费者,它的 wait-free 版本如下:

template <typename T> class fifo{
public:bool push(T && arg){auto pos = writepos.load();auto next = (pos + 1) % slots.size();if(next == readpos.load())return false;slots[pos] = std::move(arg);writepos.store(next);return true;}bool pop(T& result){auto pos = readpos.load();if(pos == writepos.load())return false;result = std::move(slots[pos]);readpos.store((pos + 1) % slots.size());return true;}private:std::vector<T> slots = {};std::atomic<int> readpos = {0};std::atomic<int> writepos = {0};
};

FIFO 有非常非常多不同的版本,应用的场景也不同,那么要如何决定选用哪种类型的 FIFO 呢?你只需要问自己两个问题:

  1. 是否会有多个线程同时向 FIFO 中读/写数据?
  2. 如果写的时候 FIFO 是满的,读的时候是空的,会发生什么?

站在生产者的角度来看,它可以是单生产者写入 FIFO,也可以是多生产者多个线程同时写入;当 FIFO 满了后,继续写入可以是强制覆盖,也可以是告知生产者写入失败。

站在消费者的角度来看,它可以是单消费者读取 FIFO 数据,也可以是多个消费者多个线程同时读取;当 FIFO 为空后,继续读取数据可以是返回 null,也可是告知消费者读取失败。

举个几个例子来说明如何进行选择。

例子1,假设你正在写一个实时显示音频波形的是程序,音频线程从麦克风获取音频,并将音频数据发送给 UI 线程。这种情况下,因为只有音频线程在生产数据,只有 UI 线程在消费数据,因此肯定采用的是单生产者-单消费者;此外,实时线程在不断的生产数据,UI 线程通常只要考虑显示最新的音频数据,因此当 FIFO 满后,直接强制覆盖没有任何影响;但 FIFO 如果为空,有可能是音频线程出现了问题,因此最好还是告知 UI 线程读取数据失败。基于以上分析,应该选择单生产者、单消费者、队列满时写入覆盖、队列空时读取失败的 FIFO。

例子2,假设你正在给一个高精度感应器完成异常值检测的功能,该仪器有多个感应器,它们同时向 FIFO 写入数据值,在消费线程中从 FIFO 中读取数据并检测是否有异常值。因此我们可以采用 多生产者-单消费者模式。由于异常值非常重要,当 FIFO 满后,你不能简单地强制覆盖,这会导致你错过异常值,因此合适的做法应该是告知写入失败。另一方面,当 FIFO 为空时,可以认为此时没有异常值,因此返回 null/0 是合适的。基于以上分析,应该选择多生产者、单消费者、队列满时写入失败、队列空时读返回 null/0 的 FIFO。

根据上面两个问题的答案,排列组合可以得到共有 16 中 FIFO,它们对读写线程 wait-free 的支持程度是不同的,具体如下表:

比如例子1,单生产者、单消费者、队列满时写入覆盖、队列空时读取失败的 FIFO,读写两端都支持 wait-free;而例子2,多生产者、单消费者、队列满时写入失败、队列空时读返回 null/0 的 FIFO,只在读线程是 wait-free。farbot 提供了上述 16 种 FIFO。

FIFO的总结:

使用场景

  • 数据比较大,std::atomic<float>::is_always_lock_free == false
  • 在非实时与实时线程之间传递数据

代价

  • 固定的 FIFO 大小
  • 当 FIFO 满后的行为(阻塞/丢弃/覆盖)
  • FIFO 读写的开销

示例

  • 打印日志、写入数据到磁盘(录音)、文件读写等

Question 2: 是否与实时线程交互?

当你选择线程之间共享数据,那么再问自己一个问题:我是否正在与实现线程交互?如果答案是否定的,那么使用锁就能解决你的问题。
举个例子,下面的代码中一个线程接受事件,同时记录事件信息,另一个线程将事件信息写入文件。事件信息使用 std::queue 来存放,两个线程共享这个对列,同时使用 std::mutex 来同步数据。

#include <queue>
#include <mutex>
#include <algorithm>std::queue<string> string_queue; // 1
std::mutex some_mutex; // 2auto log_worker = [&] () {while(is_running()){std::lock_guard<std::mutex> guard(some_mutex);for(;!string_queue.empty();){print_log_to_file(string_queue.front());string_queue.pop();}}
};auto event_worker = [&] () {while (is_running()){wait_for_event();// log event informationstd::lock_guard<std::mutex> guard(some_mutex);string_queue.push(get_event_info());}
};auto log_future = std::async (std::launch::async, audio_worker);
auto event_future = std::async (std::launch::async, gui_worker);log_future.wait();
event_future.wait();

Question 3: 共享数据是否足够小?

如果你与实时线程共享的数据足够小,那么使用 std::atomic。举个具体的例子,如下:

auto gain = 1.0f;void processSensorData(float* sensorInOut, int n)
{// do some dsp...for(int i = 0; i < n; ++i)sensorInOut *= gain;
}// called on another thread
void setSensorGain(float newGain)
{gain = newGain;
}

上面的代码中 setSensorGain 对 gain 进行了修改,与 processSensorData 形成了数据争用(data race)。数据争用在 C/C++ 中是未定义行为,这导致你的代码行为令人难以捉摸,出现 bug 难以定位和排查。我们讨论其中一种可能,编译器可能假设在 processSensorData 中,gain 不会发生变化,因此代码可能被优化为:

void processSensorData(float* sensorInOut, int n)
{// do some dsp...regrester auto gain_copy = gain;for(int i = 0; i < n; ++i)sensorInOut *= gain_copy;
}

这种情况不算太糟糕,只是新参数真正起作用的时间被延后了。但考虑另一种情况,代码如下:

auto gain = 1.0f;void realtimeThreadEntry()
{while(rocketFlying()){processSensorData(sensorData, 512);}
}void processSensorData(float* sensorInOut, int n)
{// do some dsp...for(int i = 0; i < n; ++i)sensorInOut *= gain;
}// called on another thread
void setSensorGain(float newGain)
{gain = newGain;
}

上述代码中,realtimeThreadEntry 为实时线程入口,内部不停地在循环处理数据,由于 processSensorData 代码量相当小,编译器可能进行 inline 优化:

void realtimeThreadEntry()
{while(rocketFlying()){// do some dsp...for(int i = 0; i < n; ++i)sensorInOut *= gain;}
}

同样的,编译器可能假设 gain 不会发生变化,采用了之前的优化,于是代码变成了这样:

void realtimeThreadEntry()
{regrester auto gain_copy = gain;while(rocketFlying()){// do some dsp...for(int i = 0; i < n; ++i)sensorInOut *= gain_copy;}
}// called on another thread
void setSensorGain(float newGain)
{gain = newGain;
}

这样的优化导致 setSensorGain 无论如何都无法对 realtimeThreadEntry 起作用了。因为我们的代码存在数据争用,引入了未定义的行为,这就意味着任何事情都可能发生。我们要极力避免这种情况。

再来看一个例子,加深对于未定义行为的畏惧之心,假设你想要推翻费马大定理的证明,那么你可能写下面这样的代码,来遍历所有数字企图找到一个反例:

bool threadRunning;// thread 1
bool proveFermatsLastTheorem(){threadRunning = true;for(int n = 3; threadRunning; ++n){if(pow(x, n) + pow(y, n) == pow(z, n)){return false;}}return true;
}void testTheorem(){bool result;startThread([](){ result = proveFermatsLastTheorem() });Sleep(2000);threadRunning = false;cout << result << endl;
}

上述代码中,优化器可能认为 threadRunning 永远都是 true,为了减少每次都从内存中读取的耗时,它可能进行如下优化:

bool proveFermatsLastTheorem(){while(true){if(pow(x, n) + pow(y, n) == pow(z, n)){return false;}++n;}return true;
}

接着,编译器进一步优化,将真正的循环条件提前:

bool proveFermatsLastTheorem(){while(pow(x, n) + pow(y, n) != pow(z, n)) ++n;return false;return true;
}

显然,最后一句 return 是多余的:

bool proveFermatsLastTheorem(){while(pow(x, n) + pow(y, n) != pow(z, n)) ++n;return false;
}

至此,整个函数无论如何只会返回 false,因此最终优化的结果可能是:

bool proveFermatsLastTheorem(){return false;
}

回到我们的问题中来,如果你与实时线程共享的数据足够小,那么使用 std::atomic。它能够解决上面两个例子的未定义行为,保证线程共享数据之间的同步性,同时阻止优化器过渡优化。

需要注意的是,在使用 std::atomic 时,我们需要检查是否是 look-free 实现:

static_assert(std::atomic<float>::is_always_lock_free)

如果 ::is_always_lock_free 返回 false,那么它本质上还是使用了锁来同步数据,这是在实时线程编程中要避免的。

总结下关于 std::atomic 使用

使用场景

  • 多个线程可能对数据进行修改

代价

  • 数据必须足够小,要满足 std::atomic<float>::is_always_lock_free == true
  • 只允许某些操作,例如 store, load, fetch_add,
    fetch_add 等等

示例

  • 线程之间共享小数据
  • 增量控制量、电平值、算法参数等等

Question 4: 获取共享资源是否允许失败?

当我们需要共享一个较大的数据,即 ::is_always_lock_free 返回 false 时,考虑是否可以允许获取资源失败。如果允许,那么使用 try_lock。举个例子,在音频线程中使用查表法,在非实时线程中更新表。

class WavetabelSynthesizer
{
public:void audioCallback(){if(std::unique_lock<mutex> tryLock(mutex, std::try_to_lock); tryLock.owns_lock()){// Do something with wavetable}else{// Do something else as wavetabel is not available}}void updateWavetable(/* args */){auto newWavetable = std::make_unique<Wavetable>(/* args */);{std::lock_guard<std::mutex> lock(mutex);std::swap(wavetable, newWavetable);}}
private:std::mutex mutex;std::unique_ptr<Wavetable> wavetable;
}

上述代码中,我们使用 try_lock 来尝试获取 wavetable 的所有权,并对获取成功和失败两种情况分别进行不同的操作。

但有一个细节需要特别注意,代码中使用了 std::mutex ,它配合 std::unique_lock 进行 try_lock 操作。查看 c++ 文档,std::mutex::try_lock 是 wait-free 操作,这非常好,但如果获取锁成功,那么在 std::unique_lock 析构的时候会调用 std::mutex::unlock 去通知其他线程,而这会调用系统 api,这是我们需要避免的,因此这种情况下 std::mutex 是不合适的。

为此,我们可以使用 spin_lock,即自旋锁。下面给出一种最基本的 spin_lock 实现,它使用 std::atomic_flag 进行 lock, unlock 和 try_lock 的实现,没有任何系统 api 调用,不会造成 block。

class spin_lock
{
public:void lock() noexcept { while(flag.test_and_set()); }void unlock() noexcept { flag.clear(); }bool try_lock() noexcept { return !flag.test_and_set(); }private:std::atomic_flag flag = ATOMIC_FLAG_INIT;
}

try_lock 总结:

使用场景

  • 数据比较大,std::atomic<float>::is_always_lock_free == false
  • 允许获取资源失败

代价

  • 非实时线程获取资源时需要等待实时线程释放资源
  • 实时线程需要处理好资源获取失败的情况

示例

  • 将大型数据传递给实时线程供其使用
  • 音频采样数组、波形图数据、滤波器参数等

Question 5:实时线程会修改线程间共享的数据吗?

如果资源获取失败是不被允许的,即无法使用 try_lock,那么考虑一个问题:实时线程是否会对共享数据进行修改?如果答案是否定的,那么可以使用 CAS 循环(Compare and Set Loop)。一个经典的例子就是双二阶滤波器算法的实现:

struct BiquadCoeffecients { float b0, b1, b2, a1, a2;}
BiquadCoeffecients coeffs;BiquadCoeffecients calculateLowPassCoeffecients(float freq);void audioThread(const float* src, float* dst, size_t n)
{static float lv1, lv2;for(size_t i = 0; i < n; ++i){auto input = src[i];auto output = (input * coeffs.b0) + lv1;lv1 = (input * coeffs.b1) - (output * coeffs.a1) + lv2;lv2 = (input * coeffs.b2) - (output * coeffs.a2);}
}void updateFrequecyParameter(float newValue)
{coeffs = calculateLowPassCoeffecients(newValue);
}

其中 audioThread 运行在实时线程中进行音频计算;updateFrequecyParameter 被其他线程调用,负责更新参数。

上述代码并没有对coeffs 进行多线程保护,这会导致未定义行为。在这个例子中,可能会偶尔出现 glitch 或者奇怪的杂音。

为了避免这种情况,一种最简单的方式是使用 std::atomic

struct BiquadCoeffecients { float b0, b1, b2, a1, a2;}
std::atomic<BiquadCoeffecients> coeffs;void audioThread(const float* src, float* dst, size_t n)
{static float lv1, lv2;auto local_coeffs = coeffs.load();for(size_t i = 0; i < n; ++i){auto input = src[i];auto output = (input * local_coeffs.b0) + lv1;lv1 = (input * local_coeffs.b1) - (output * local_coeffs.a1) + lv2;lv2 = (input * local_coeffs.b2) - (output * local_coeffs.a2);}
}

但很遗憾,由于BiquadCoeffecients 包含 5 个 float,它太大了,以至于 std::atomic<BiquadCoeffecients> 并不是 lock free 的实现。因此这种情况下,我们不能使用 std::atomic<BiquadCoeffecients>

在引出 CAS 方法前,我们先考虑一种使用 std::atomic<BiquadCoeffecients*> 的方法,对上述代码做简单的修改:

struct BiquadCoeffecients { float b0, b1, b2, a1, a2;}
std::atomic<BiquadCoeffecients*> coeffs;void audioThread(const float* src, float* dst, size_t n)
{auto* coeffsCopy = coeffs.copy();processBiquad(src, dst, n, coeffsCopy);
}void updateFrequecyParameter(float newValue)
{coeffs = new BiquadCoeffecients(calculateLowPassCoeffecients(newValue));
}

使用 std::atomic 来管理指针,保证其更新是多线程安全的。这种方式可以工作,但它会造成内存泄露,因为你无法知道 coeffs 是否在被使用。也许我们可以通过设置 flag 来标识 coeffs 是否在实时线程中被使用:

struct BiquadCoeffecients { float b0, b1, b2, a1, a2;}
BiquadCoeffecients* coeffs;
std::atomic<bool> isInAudioThread{ false };void audioThread(const float* src, float* dst, size_t n)
{isInAudioThread = true;auto* coeffsCopy = coeffs.copy();processBiquad(src, dst, n, coeffsCopy);isInAudioThread = false;
}void updateFrequecyParameter(float newValue)
{auto* ptr = new BiquadCoeffecients(calculateLowPassCoeffecients(newValue));while(isInAudioThread.load());std::swap(ptr, coeffs);delete ptr;
}

上面代码中,我们加入 isInAudioThread flag 变量,在进行音频线程处理时将其设置为 true,退出时为 flase;在非实时线程中,如果 isInAudioThread 为 true,则循环等待实时线程退出,然后通过 swap 对数据指针进行更新,最后释放旧数据。

但这仍然存在问题,在检查资源是否被使用(即 while 循环)和更新资源之间,实时线程重新占用了资源,即isInAudioThread 发生变化。因此检查与更新不具有原子性,导致了所谓的 ABA 问题:

void updateFrequecyParameter(float newValue)
{auto* ptr = new BiquadCoeffecients(calculateLowPassCoeffecients(newValue));while(isInAudioThread.load());// isInAudioThread could be changed herestd::swap(ptr, coeffs);delete ptr;
}

最终的解决方案是结合上面两种方法,具体看下面代码:

struct BiquadCoeffecients { float b0, b1, b2, a1, a2;}
std::unique_ptr<BiquadCoeffecients> storage { std::make_unique<BiquadCoeffecients>() };
std::atomic<BiquadCoeffecients*> biquadCoeffs;void processAudio(float* buffer)
{auto* coeffs = biquadCoeffs.exchange(nullptr);processBiquad(*coeffs, buffer);biquadCoeffs = coeffs;
}void changeBiquadParameters(BiquadCoeffecients newCoeffs)
{auto newBiquad = std::make_unique<BiquadCoeffecients>(newCoeffs);for(auto* expected = storage.get(); !biquadCoeffs.compare_exchange_strong(expected, newBiquad.get()); expected = storage.get());storage = std::move(newBiquad);
}

我们使用 std::unique_ptr<BiquadCoeffecients> 存放参数数据资源,用 std::atomic<BiquadCoeffecients*> 指向当前可用资源。

在实时线程中,首先使用 exchange 返回当前可用资源指针的同时,将其置空,而置空则表示实时线程正在使用该资源。在实时线程完成处理后,biquadCoeffs = coeffs; 重新赋值资源指针。

在非实时线程中对参数进行更新,for 循环中关键代码 biquadCoeffs.compare_exchange_strong(expected, newBiquad.get()) 表示如果实时线程不再使用资源,那么就将资源指针更新;否则就自旋等待实时线程。最后通过 move 来更新资源。

这种方法无法处理实时线程更新数据的情况,下面代码为例,如果 b0 参数在实时线程进行了更新,那么非实时线程将无法得知这一更新,导致实时线程所有的参数更新就被丢失。

void processAudio(float* buffer)
{auto* coeffs = biquadCoeffs.exchange(nullptr);processBiquad(*coeffs, buffer);coeffs->b0 *= 2; // update data in real-time threadbiquadCoeffs = coeffs;
}void changeBiquadParameters(BiquadCoeffecients newCoeffs)
{auto newBiquad = std::make_unique<BiquadCoeffecients>(newCoeffs);for(auto* expected = storage.get(); !biquadCoeffs.compare_exchange_strong(expected, newBiquad.get()); expected = storage.get());storage = std::move(newBiquad);
}

非实时线程修改数据的总结:

使用场景

  • 数据比较大,std::atomic<float>::is_always_lock_free == false
  • 非实时线程可以修改数据
  • 实时线程总能获取到资源

代价

  • 实时线程不能修改数据
  • 非实时线程需等待实时线程释放资源
  • 在非实时线程上有复制的开销

示例

  • 非实时线程共享大型数据供实时线程使用
  • 音频采样数组、波形图数据、滤波器参数等

Question 6:非实时线程会修改线程间共享的数据吗?

如果实时线程需要更新数据,并且非实时线程不会修改数据,那么使用双缓冲策略。其中一个 buffer 用于实时线程,另一个用于非实时线程。它们满足:

  1. 两个 buffer 的数据都被预先正确的初始化
  2. 实时线程能够随时对数据进行更新
  3. 当非实时线程想要读取数据时,交换两个 buffer 的槽(slot)
  4. 实时线程能够继续更新数据,而非实时线程也能读取数据。

看下面这个例子:

using FrequencySpectrum = std::array<float, 512>;std::array<FrequencySpectrum,2> mostRecentSpectrum;
// idx denotes current slot of realtime thread, idx xor 1 denotes slot of non-realtime-thread
std::atomic<int> idx = {0};void processAudio(const float* buffer, size_t n)
{auto freqSpec = calcualteSpectrum(buffer, n);mostRecentSpectrum[idx.load()] = freqSpec;
}void updateSpectrumUIbuttonClicked()
{auto i = idx.fetch_xor(1);displaySpectrum(mostRecentSpectrum[i]);
}

我们用 idx 来表示对两个 buffer 的引用。实时线程更新数据时,只需要简单的引用 idx.load() 即可;而非实时线程读取数据时,需要做 fetch_xor 操作,进行交换 slots。

很简单的策略,但非常遗憾,它是有缺陷的。。这里举例说明这种情况:

  1. 开始时,两个 buffer 数据为:A - B
  2. 接着,实时线程更新数据:C - B
  3. 非实时线程获取数据第一次,进行 slots 交换:B - C
  4. 非实时线程获取数据第二次,进行 slots 交换:C - B

可以看到,如果 updateSpectrumUIbuttonClicked 连续被触发两次,它将使用错误的旧数据。因此需要引入额外的 flag 来标识实时是否更新了数据,例如:

using FrequencySpectrum = std::array<float, 512>;std::array<FrequencySpectrum,2> mostRecentSpectrum;
std::atomic<int> idx = {0};
std::atomic<bool> hasNewData = false;void processAudio(const float* buffer, size_t n)
{auto freqSpec = calcualteSpectrum(buffer, n);mostRecentSpectrum[idx.load()] = freqSpec;hasNewData = true;
}void updateSpectrumUIbuttonClicked()
{if(hasNewData){auto i = idx.fetch_xor(1); // swap slot indexdisplaySpectrum(mostRecentSpectrum[i]);// hasNewData could be changed herehasNewData = false;}else{auto i = idx.load() ^ 1;displaySpectrum(mostRecentSpectrum[i]);}}

但引入两个 atomic 来解决一个问题,通常会导致 ABA 问题。在上面代码中,在 hasNewData = false 之前,实时线程可能对 hasNewData = true 的操作,而这个操作直接就被覆盖了,这就导致数据的丢失。

为了不引入 ABA 问题,我们采用一种曲线救国的办法:使用位运算。具体代码如下:

using FrequencySpectrum = std::array<float, 512>;enum {BIT_IDX = (1 << 0), BIT_NEWDATA = (1 << 1)};std::array<FrequencySpectrum,2> mostRecentSpectrum;
std::atomic<int> idx = {0};bool hasNewData(int i)
{return (i & BIT_NEWDATA) != 0;
}int swapIndicesAndResetNewDataBit(int i)
{i = (i & BIT_IDX) ^ 1;return i;
}void processAudio(const float* buffer, size_t n)
{auto freqSpec = calcualteSpectrum(buffer, n);auto i = idx.load() & BIT_IDX;mostRecentSpectrum[i] = freqSpec;idx.store((i & BIT_INX) | BIT_NEWDATA);
}void updateSpectrumUIbuttonClicked()
{auto current = idx.load();if( hasNewData(current) ){current = swapIndicesAndResetNewDataBit(current);idx.store(current);}displaySpectrum(mostRecentSpectrum[(current & BIT_IDX) ^ 1]);
}

上述代码乍一看挺复杂,耐下心来看其实不复杂。它利用一个 std::atomic<int> 的位信息来表示多个状态,避免了 ABA 问题。从二进制的角度来看,状态有四种:

位信息 状态
0x00 无新数据;实时线程的 slot_index = 0
0x01 无新数据;实时线程的 slot_index = 1
0x10 有新数据;实时线程的 slot_index = 0
0x11 有新数据;实时线程的 slot_index = 1

可以看到,高位用于表示有无新数据更新,低位用于表示实时线程使用的 slot index。当实时线程更新数据时,通过 | BIT_NEWDATA 来设置高位 bit 的值;当非实时线程读取数据时,它将交换 slot index 同时重置高位 bit。

这种方法避免了 ABA 问题,但很遗憾它是错误的,虽然已经接近正确答案了。你能看出哪里有问题吗?答案是 mostRecentSpectrum 发生了数据竞争,我们以下面线程执行顺序为例

Realtime Thread Non-realtime Thread
void processAudio(...);
idx = 0x10
void processAudio(...)
{
i = idx & BIT_IDX = 0;
}
current = idx = 0x01
current = swapIndicesAndResetNewDataBit() = 0x01
(current & BIT_IDX) ^ 1 = 0

如果线程按照上述顺序执行,那么此时此刻,实时线程将往 0 号 buffer 写数据,同时非实时线程将从 0 号 buffer 读数据,这就引入了对于 0 号 buffer 的数据竞争问题。

OK 为了避免这种问题,需要再额外引入一个 bit 位,BUSY_BIT。当 BUSY_BIT 被设置位 1 时,表示实时线程正在在使用某个 slot index,非实时线程则需要等待 BUSY_BIT 为 0 才去重置 idx。具体看下面的代码:

using FrequencySpectrum = std::array<float, 512>;enum {BIT_IDX = (1 << 0), BIT_NEWDATA = (1 << 1), BUSY_BIT = (1 << 2)};std::array<FrequencySpectrum,2> mostRecentSpectrum;
std::atomic<int> idx = {0};bool hasNewData(int i)
{return (i & BIT_NEWDATA) != 0;
}void processAudio(const float* buffer, size_t n)
{auto freqSpec = calcualteSpectrum(buffer, n);auto i = idx.fetch_or(BIT_BUSY) & BIT_IDX;mostRecentSpectrum[i] = freqSpec;idx.store((i & BIT_INX) | BIT_NEWDATA);
}void updateSpectrumUIbuttonClicked()
{auto current = idx.load();if( hasNewData(current) ){int newValue;do{current &= ~BIT_BUSY;newValue = (current ^ BIT_IDX) & BIT_IDX;}while(!idx.compare_exchange_weak(current, newVaule))current = newValue;}displaySpectrum(mostRecentSpectrum[(current & BIT_IDX) ^ 1]);
}

好吧,这实在是有些复杂,好在 Fabian Renn-Giles & Dave Rowland 开源了 farbot 简化了这些复杂的操作。使用 farbot 后代码如下:

using FrequencySpectrum = std::array<float, 512>;
RealtimeObject<FrequencySpectrum, RealtimeObjectOptions::realtimeMutatable> mostRecentSpectrum;/* called on realtime thread */
void processAudio (const float* buffer, size_t n) {RealtimeObject<FrequencySpectrum, RealtimeObjectOptions::realtimeMutatable>::ScopedAccess<ThreadType::realtime> freqSpec(mostRecentSpectrum);*freqSpec = calculateSpectrum (buffer, n);
}/* called on non-realtime thread */
void updateSpectrumUIButtonClicked() {RealtimeObject<FrequencySpectrum, RealtimeObjectOptions::realtimeMutatable>::ScopedAccess<ThreadType::nonRealtime> recentSpectrum(mostRecentSpectrum);displaySpectrum(*recentSpectrum);
}

实时线程修改数据的总结:

使用场景

  • 数据比较大,std::atomic<float>::is_always_lock_free == false
  • 实时线程可以修改数据
  • 实时线程总能获取到资源

代价

  • 非实时线程不能修改数据
  • 非实时线程需等待实时线程释放资源
  • 在 实时/非实时线程 上有复制的开销

示例

  • 实时线程共享大型数据供非实时线程使用
  • GUI 数据可视化,频谱图,示波器等

Question 7: 实时线程和非实时线程都会修改数据,这该怎么办?

最后一种情况,也是最复杂的情况,当实时/非实时线程都会修改数据时,

接下来要介绍 callAsync 机制。如果你想在实时线程做一些非实时线程安全的事情,例如打印 log、读取文件等等,我们可以采取这样一种策略:将这些任务延缓执行,将其放置到某个 FIFO 中,之后再由非实时线程去执行它们。

在 farbot 中提供了 AsyncCaller 来实现 callAsync。具体代码如下:

class AsyncCaller{
public:void callAsycn(std::function<void()>&& lambda){auto success = queue.push(std::move(lambda));assert(success);}void process(){std::function<void()> lambda;while(queue.pop(lambda))lambda();}
private:fifo<std::function<void()>> queue;
}AsyncCaller messageThreadExecutor;void timerCallback(){messageThreadExecutor.process();
}messageThreadExecutor.callAsync([](){ cout << "hello world" << endl; });

上述代码中:

  1. 实时线程中调用 callAsycn 将 lambda 函数 push 到 wait-free FIFO 中。这里需要保证 lambda 是 real-time movable。关于 real-time movable,farbot 提供了方便的工具来帮助我们判断。
  2. 在 JUCE 中也有类似 AsyncCaller 的工具,叫 AsyncUpdater。在其实现中,AsyncUpdater 将任务 push 到队列后,然后去唤醒另一个线程来执行任务。而唤醒线程是系统调用,它不是实现线程安全的操作。
  3. 非实时线程中,调用 process 从 FIFO 中 pop 出 lambda 函数,并执行它。具体实现中,可以通过设置定时器(timer)来间歇性执行 process 函数。

介绍完 AsyncCaller 后,让我们进入最后也是最困难的一中情况:实时线程和非实时线程都会修改数据。作者之言,他认为这种情况下无法做到 wait-free,但可以曲线救国,选定一个线程负责执行修改数据的操作,其他线程只要把修改数据命令发送到该线程即可。

举个例子,例如在音频线程中去 mix 音频,用户可以在 UI 线程增加或者删除音源,音频线程也可以增加或者删除音源。我们选择音频负责执行数据修改,因为
当实时线程中发生修改事件时,能够立马得到处理。

伪代码如下:

struct SourceList{std::array<const float*, MAX_SOURCES> buffers = {};int numSources = 0;
};class Mixer
{
public:void mixAllSources(float* output, char* realtimeEventMessage, int n){processRealtimeEvents(realtimeEventMessage); // may add and remove sourcesrealtimeThreadCaller.process(); // process all the lambdasRealtimeObject<SourceList, RealtimeObjectOptions::realtimeMutatable>::ScopedAccess<ThreadType::realtime> sourceList(sharedSourceList);for(int i = 0; i < sourceList->numSources; ++i)mixSource(output, sourceList->buffers[i]);}void addSource(const float* src){if(!isRealtimeThread()){realtimeThreadCaller.callAsync([src]( addSource(src); ));return;}RealtimeObject<SourceList, RealtimeObjectOptions::realtimeMutatable>::ScopedAccess<ThreadType::realtime> sourceList(sharedSourceList);assert(sourceList->numSources < MAX_SOURCES);sourceList->buffers[sourceList->numSources++] = src;}private:RealtimeObject<SourceList, RealtimeObjectOptions::realtimeMutatable> sharedSourceList;AsyncCaller realtimeThreadCaller;}Mixer mixer;// audio thread
while(audioThreadIsRuning()){if(comesNewSource()){mixer.mixAllSources(output, "Add Source", num_samples);}else{mixer.mixAllSources(output, nullptr, num_samples);}
}// UI thread
mixer.addSource(...)

上述代码中,

  1. mixAllSources 在进行 mix 音频数据之前,调用 realtimeThreadCaller.process() 去执行修改数据的任务。
  2. addSource 如果是非实时线程调用,那么通过callAsync 函数将任务 push 到 FIFO 中;如果是实时线程调用,那么立刻马上修改sourceList 数据。

实时/非实时线程修改数据的总结:

使用场景

  • 数据比较大,std::atomic<float>::is_always_lock_free == false
  • 实时线程与非实时线程共享数据
  • 实时/非实时线程都能修改数据

代价

  • 其中一个线程需要持有数据
  • 同时具有 FIFO 和实时/非实时线程修改数据的代价
  • 实现复杂

示例

  • 管理动态音频流,其中数据包丢失是不可接受的

总结

最后用一张图总结

实时音频编程(二):实践与技巧相关推荐

  1. 一本需要购买的图形学方面的好书:GPU精粹——实时图形编程的技术、技巧和技艺(附CD-ROM光盘一张)  ...

    GPU精粹--实时图形编程的技术.技巧和技艺(附CD-ROM光盘一张) 内容提要: 本书由引领全球计算机图形芯片技术发展的NVIDIA公司组稿,汇集当今国际上前沿开发者们经多年研究和实践得出的实用的实 ...

  2. 实时音频混音技术在视频直播中的实践应用

    作者:冼牛 转自:前端之巅 最近半年,视频直播领域中产生不少创新玩法,其中包括 K 歌直播和合唱直播.这些创新玩法都用到实时音频混音技术.今天我们来聊一下混音技术的实现,及其在创新玩法中的应用. 混音 ...

  3. 实时音频混音技术在视频直播场景中的实践

    最近半年,视频直播领域中产生不少创新玩法,其中包括K歌直播和合唱直播.这些创新玩法都用到实时音频混音技术.今天我们来聊一下混音技术的实现,及其在创新玩法中的应用. \\ 混音的应用场景 \\ 混音,顾 ...

  4. iOS音频编程之实时语音通信

    http://blog.csdn.net/it_yangjing/article/details/51909991 在CSDN上显示的代码格式不全,在github blog地址显示正确 iOS音频编程 ...

  5. Flink从入门到精通100篇(二十三)-基于Apache Flink的爱奇艺实时计算平台建设实践

    前言 随着大数据的快速发展,行业大数据服务越来越重要.同时,对大数据实时计算的要求也越来越高.今天会和大家分享下爱奇艺基于Apache Flink的实时计算平台建设实践. 今天的介绍会围绕下面三点展开 ...

  6. linux文件系统添加pcm,嵌入式linux中PCM音频编程实践

    嵌入式设备中经常需要用的音频,音频设备最原始的数据格式就是PCM,也就是大家常见的WAV,在linux中,音频编程使用最多的就是alsa框架,下面就来看一下pcm音频的编程实例吧. 首先需要包含头文件 ...

  7. 网易云信实时音频框架背后:算法优化带来产品体验全面提升

    2018年10月19日,LiveVideoStackCon音视频技术大会在北京召开.本届会议以"技术开启新'视'界"为主题,汇集资深的音视频技术工程师,探讨在音频.视频.图像等技术 ...

  8. 实时流媒体编程基于Linux环境开发

    一.流媒体简介 随着Internet的日益普及,在网络上传输的数据已经不再局限于文字和图形,而是逐渐向声音和视频等多媒体格式过渡.目前在网络上传输音频/视频(Audio/Video,简称A/V)等多媒 ...

  9. Linux下的实时流媒体编程

    流媒体指的是在网络中使用流技术传输的连续时基媒体,其特点是在播放前不需要下载整个文件,而是采用边下载边播放的方式,它是视 频会议.IP电话等应用场合的技术基础.RTP是进行实时流媒体传输的标准协议和关 ...

最新文章

  1. 使用PowerShell登陆多台Windows,测试DCAgent方法
  2. Hibernate缓存
  3. 重磅!苹果祭出大招:史上最强 Mac 发布,iPad OS 惊艳问世
  4. oracle if后面为null,Oracle中NVL2 和NULLIF的用法
  5. 结合“性能监视器” 排查、处理性能瓶颈导致应用吞吐率等指标上不去的问题...
  6. input中的disabled 和 readonly的区别
  7. Kraken发言人:公司可能会考虑明年上市,但不适合SPAC模式
  8. WPS入门StackPanel与Grid
  9. 【定位】TOF与TDOA
  10. 【电商数仓】数仓即席查询之Kylin简介,安装和使用
  11. 实验8.1 时间换算
  12. moses中的数据预处理预处理操作
  13. 006 window7或虚拟机上不了网或DNS出问题
  14. KeyShot 11 Pro for Mac(3D渲染和动画制作) V11.3.2.2中文安装+更新内容
  15. React学习总结(一)
  16. 洛谷 P2181对角线——排列组合
  17. 国内软件好压,能够打开各种exe安装包,并直接解压安装 - 国内软件质量测评
  18. Vue3 高级语法以及自定义组件
  19. 树莓派linux优酷软件,树莓派安装OSMC打造家庭影院,还能够看优酷和CCTV
  20. iPhone手机 手机设备号和手机串号imsi

热门文章

  1. multisim高频小信号放大器_什么是放大器及其用处_光纤传感器_传感器
  2. mysql 获取操作系统信息_php获取服务器操作系统相关信息的方法
  3. ideal如何快速导入import_【MAC版】pr预设安装目录?pr如何快速批量导入lut
  4. oracle 更新flwid,Oracle分析函数RANK(),ROW_NUMBER(),LAG()等的使用方法
  5. 计算机基础术语巧记,报考28个专业术语,你都知道吗?掌握这些才算入门!
  6. 中加学校计算机考试题,嘉应学院2009年计算机期末考试试题
  7. python简单的编程_简单的Python2.7编程初学经验总结
  8. 语言运算顺序题目_我的Python学习笔记:今天我学了关于Python里的运算符及运算顺序...
  9. l2tp连接尝试失败 因为安全层在初始化_线程安全互斥锁
  10. java 字符串排列_Java实现字符串的全排列