本文介绍如何在应用程序中实现生产者-使用者模式。 在此模式下,制造者向消息块发送消息,使用者从该块读取消息。
本文演示了两种方案。 在第一种方案中,使用者必须接收生产者发送的每条消息。 在第二种方案中,使用者定期轮询数据,因此不必接收每条消息。
本文中的这两个示例都使用代理、消息块和消息传递函数将消息从生产者传输到使用者。 生产者代理使用 concurrency::send 函数将消息写入 concurrency::ITarget 对象。 使用者代理使用 concurrency::receive 函数从 concurrency::ISource 对象读取消息。 两个代理都持有一个 sentinel 值来协调处理结束。
示例:向使用者代理发送一系列数字
在此示例中,生产者代理向使用者代理发送一系列数字。 使用者将接收每一个数字并计算其平均值。 应用程序将平均值写入控制台。
此示例使用 concurrency::unbounded_buffer 对象,使生产者能够对消息进行排队。 unbounded_buffer 类实现 ITarget 和 ISource,以便生产者和使用者可以通过共享的缓冲区发送和接收消息。 send 和 receive 函数协调将数据从生产者传播到使用者的任务。
// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>using namespace concurrency;
using namespace std;// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel): _target(target), _count(count), _sentinel(sentinel){}
protected:void run(){// Send the value of each loop iteration to the target buffer.while (_count > 0){send(_target, static_cast<int>(_count));--_count;}// Send the sentinel value.send(_target, _sentinel);// Set the agent to the finished state.done();}
private:// The target buffer to write to.ITarget<int>& _target;// The number of values to send.unsigned int _count;// The sentinel value, which informs the consumer agent to stop processing.int _sentinel;
};// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:explicit consumer_agent(ISource<int>& source, int sentinel): _source(source), _sentinel(sentinel){}// Retrieves the average of all received values.int average(){return receive(_average);}
protected:void run(){// The sum of all values.int sum = 0;// The count of values received.int count = 0;// Read from the source block until we receive the // sentinel value.int n;while ((n = receive(_source)) != _sentinel){sum += n;++count;}// Write the average to the message buffer.send(_average, sum / count);// Set the agent to the finished state.done();}
private:// The source buffer to read from.ISource<int>& _source;// The sentinel value, which informs the agent to stop processing.int _sentinel;// Holds the average of all received values.single_assignment<int> _average;
};int wmain()
{// Informs the consumer agent to stop processing.const int sentinel = 0;// The number of values for the producer agent to send.const unsigned int count = 100;// A message buffer that is shared by the agents.unbounded_buffer<int> buffer;// Create and start the producer and consumer agents.producer_agent producer(buffer, count, sentinel);consumer_agent consumer(buffer, sentinel);producer.start();consumer.start();// Wait for the agents to finish.agent::wait(&producer);agent::wait(&consumer);// Print the average.wcout << L"The average is " << consumer.average() << L'.' << endl;
}
本示例生成以下输出。
The average is 50.
示例:向使用者代理发送一系列股票报价
在此示例中,生产者代理向使用者代理发送一系列股票报价。 使用者代理定期读取当前报价并将其打印到控制台。
此示例与之前的示例类似,不同之处在于它使用 concurrency::overwrite_buffer 对象,使生产者能够与使用者共享一条消息。 与之前的示例中一样,overwrite_buffer 类实现 ITarget 和 ISource,以便生产者和使用者可以对共享的消息缓冲区执行操作。
// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>using namespace concurrency;
using namespace std;// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:explicit producer_agent(ITarget<double>& target): _target(target){}
protected:void run(){// For illustration, create a predefined array of stock quotes. // A real-world application would read these from an external source, // such as a network connection or a database.array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };// Send each quote to the target buffer.for_each (begin(quotes), end(quotes), [&] (double quote) { send(_target, quote);// Pause before sending the next quote.concurrency::wait(20);});// Send a negative value to indicate the end of processing.send(_target, -1.0);// Set the agent to the finished state.done();}
private:// The target buffer to write to.ITarget<double>& _target;
};// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:explicit consumer_agent(ISource<double>& source): _source(source) {}protected:void run(){// Read quotes from the source buffer until we receive// a negative value.double quote;while ((quote = receive(_source)) >= 0.0){// Print the quote.wcout.setf(ios::fixed);wcout.precision(2);wcout << L"Current quote is " << quote << L'.' << endl;// Pause before reading the next quote.concurrency::wait(10);}// Set the agent to the finished state.done();}
private:// The source buffer to read from.ISource<double>& _source;
};int wmain()
{// A message buffer that is shared by the agents.overwrite_buffer<double> buffer;// Create and start the producer and consumer agents.producer_agent producer(buffer);consumer_agent consumer(buffer);producer.start();consumer.start();// Wait for the agents to finish.agent::wait(&producer);agent::wait(&consumer);
}
此示例生成以下示例输出。
Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.
与 unbounded_buffer 对象不同,receive 函数不会从 overwrite_buffer 对象中删除消息。 如果使用者在生产者覆盖消息之前多次从消息缓冲区读取,则接收方每次都会获得相同的消息。