目录
01_wait_group.cc
02_http_wait.cc
03_httptask_callback.cc
04_http_task_request.cc
05_redis_task_callback.cc
06_redistask_read.cc
07_series.cc
08_series_dynamic.cc
09_context.cc
10_parallel_work.cc
作业:
01 存在下列的redis键值映射关系,使用workflow的redis任务和序列,假如只知道"x1",如何找到最终的"100"?"x1" --> "x2""x2" --> "x3""x3" --> "x4""x4" --> "100"
02 读取某个网站的内容,并且存入redis服务端当中(比如先访问淘宝,再set www.taobao.com 淘宝的html内容)
03 阅读下面的代码并尝试添加注释
01_wait_group.cc
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>#include <signal.h>
#include <iostream>using std::cout;
using std::cerr;static WFFacilities::WaitGroup WaitGroup(1);void handler(int signum){cout<<"done\n";WaitGroup.done();
}int main(void)
{signal(SIGINT,handler);//创建任务WFHttpTask * httpTask=WFTaskFactory::create_http_task(/* "http://www.baidu.com", */
"http://localhost/en/index.html",10,10,nullptr);//交给框架执行httpTask->start();WaitGroup.wait();cout<<"finish\n";return 0;
}/* static WFHttpTask *create_http_task(const std::string& url, *//* int redirect_max,//最大重定向次数 *//* int retry_max, //最大重试次数*//* http_callback_t callback//回调函数); */
02_http_wait.cc
// wait_group 实现有条件的等待
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <iostream>
#include <signal.h>
using std::cout;
using std::cerr;
static WFFacilities::WaitGroup waitGroup(1);
void handler(int signum){cout << "done\n";waitGroup.done();
}
int main(){signal(SIGINT,handler);// 用户代码 1 创建任务WFHttpTask * httpTask = WFTaskFactory::create_http_task(//"http://www.baidu.com","http://localhost/en/index.html",10,10,nullptr);// 用户代码 2 把任务交给框架httpTask->start();waitGroup.wait();cout << "finish!\n";
}
03_httptask_callback.cc
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/HttpUtil.h>#include <signal.h>
#include <iostream>using std::cout;
using std::cerr;//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);void sighandler(int signum){waitGroup.done();//waitGroup(--num);cout<<"done\n";
}//------------------------------------//
// httpCallback()
//------------------------------------//
void httpCallback(WFHttpTask* httpTask){cout<<"callback is called\n";protocol::HttpRequest* req=httpTask->get_req();protocol::HttpResponse* resq=httpTask->get_resp();int state=httpTask->get_state();int error=httpTask->get_error();//错误原因//状态处理switch(state) {case WFT_STATE_SYS_ERROR: // 系统错误cerr <<"system error: " << strerror(error) << "\n"; break;case WFT_STATE_DNS_ERROR: // DNS错误cerr <<"DNS error: " << gai_strerror(error) << "\n"; break;case WFT_STATE_SSL_ERROR: // SSL错误cerr <<"SSL error: " << error << "\n"; break;case WFT_STATE_TASK_ERROR: // 任务错误cerr <<"Task error: "<< error << "\n"; break;case WFT_STATE_SUCCESS: // 请求成功break; }//错误处理if (state != WFT_STATE_SUCCESS) {cerr <<"Failed. Press Ctrl-C to exit.\n";return;}//------------------------------------////请求报文-起始行-首部字段cerr<<"method="<<req->get_method()<<"\n";cerr<<"version="<<req->get_http_version()<<"\n";cerr<<"path& query="<<req->get_request_uri()<<"\n";std::string name;std::string value;protocol::HttpHeaderCursor req_cursor(req);//类似迭代器,初始化while(req_cursor.next(name,value)){cerr<<"name ="<<name<<" value ="<<value<<"\n";}cerr<<"\n";//------------------------------------////响应报文-起始行-首部字段-报文体cerr<<"version="<<resq->get_http_version()<<"\n";cerr<<"state code="<<resq->get_status_code()<<"\n";cerr<<"reason phrea="<<resq->get_reason_phrase()<<"\n";protocol::HttpHeaderCursor resp_cursor(resq);//类似迭代器,初始化while(resp_cursor.next(name,value)){cerr<<"name ="<<name<<" value ="<<value<<"\n";}const void * body;size_t body_len;resq->get_parsed_body(&body,&body_len);cerr<<static_cast<const char*>(body)<<"\n";}//------------------------------------//
// main()
//------------------------------------//
int main(void)
{signal(SIGINT,sighandler);WFHttpTask* httpTask=WFTaskFactory::create_http_task("http://www.taobao.com",10,10,httpCallback);httpTask->start();waitGroup.wait();cerr<<"\nfinish\n";return 0;
}
04_http_task_request.cc
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/HttpMessage.h>
#include <workflow/HttpUtil.h>#include <signal.h>
#include <iostream>using std::cout;
using std::cerr;//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);void sighandler(int signum){waitGroup.done();//waitGroup(--num);cout<<"done\n";
}//------------------------------------//
// httpCallback()
//------------------------------------//
void httpCallback(WFHttpTask* httpTask){cout<<"callback is called\n";protocol::HttpRequest* req=httpTask->get_req();protocol::HttpResponse* resq=httpTask->get_resp();int state=httpTask->get_state();int error=httpTask->get_error();//错误原因//状态处理switch(state) {case WFT_STATE_SYS_ERROR: // 系统错误cerr <<"system error: " << strerror(error) << "\n"; break;case WFT_STATE_DNS_ERROR: // DNS错误cerr <<"DNS error: " << gai_strerror(error) << "\n"; break;case WFT_STATE_SSL_ERROR: // SSL错误cerr <<"SSL error: " << error << "\n"; break;case WFT_STATE_TASK_ERROR: // 任务错误cerr <<"Task error: "<< error << "\n"; break;case WFT_STATE_SUCCESS: // 请求成功break; }//错误处理if (state != WFT_STATE_SUCCESS) {cerr <<"Failed. Press Ctrl-C to exit.\n";return;}//------------------------------------////请求报文-起始行-首部字段cerr<<"method="<<req->get_method()<<"\n";cerr<<"version="<<req->get_http_version()<<"\n";cerr<<"path& query="<<req->get_request_uri()<<"\n";std::string name;std::string value;protocol::HttpHeaderCursor req_cursor(req);//类似迭代器,初始化while(req_cursor.next(name,value)){cerr<<"name ="<<name<<" value ="<<value<<"\n";}cerr<<"\n";//------------------------------------////响应报文-起始行-首部字段-报文体cerr<<"version="<<resq->get_http_version()<<"\n";cerr<<"state code="<<resq->get_status_code()<<"\n";cerr<<"reason phrea="<<resq->get_reason_phrase()<<"\n";protocol::HttpHeaderCursor resp_cursor(resq);//类似迭代器,初始化while(resp_cursor.next(name,value)){cerr<<"name ="<<name<<" value ="<<value<<"\n";}/* const void * body; *//* size_t body_len; *//* resq->get_parsed_body(&body,&body_len); *//* cerr<<static_cast<const char*>(body)<<"\n"; *//* GET: 通常用于请求数据,不会对服务器的状态产生副作用。 */
/* POST: 用于提交数据,通常会导致服务器状态改变或者创建新的资源。 */
/* 百度的接口: *//* 百度的搜索接口一般是通过 GET 请求来获取搜索结果。因此,即使您将方法设置为 POST,如果服务器只支持 GET,请求仍然会被处理为 GET。 */}//------------------------------------//
// main()
//------------------------------------//
int main(void)
{signal(SIGINT,sighandler);WFHttpTask* httpTask=WFTaskFactory::create_http_task("http://www.taobao.com",10,10,httpCallback);//找到请求并设置残数protocol::HttpRequest* req=httpTask->get_req();req->set_method("POST");req->set_request_uri("/s?wd=123");//百度查询接口req->add_header_pair("myname","workflow");cerr<<"method="<<req->get_method()<<"\n";cerr<<"version="<<req->get_http_version()<<"\n";cerr<<"path& query="<<req->get_request_uri()<<"\n";cerr<<"\n";cerr<<"\n";httpTask->start();waitGroup.wait();cerr<<"\nfinish\n";return 0;
}
05_redis_task_callback.cc
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>#include <signal.h>
#include <iostream>using std::cout;
using std::cerr;//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);void sighandler(int signum){waitGroup.done();//waitGroup(--num);cout<<"done\n";
}//------------------------------------//
// redisCallback()
//------------------------------------//
void redisCallback(WFRedisTask* redistask){cout<<"callback is called\n";protocol::RedisRequest *req=redistask->get_req();protocol::RedisResponse* resp=redistask->get_resp();int state=redistask->get_state();int error=redistask->get_error();//状态处理switch(state) {case WFT_STATE_SYS_ERROR: // 系统错误cerr <<"system error: " << strerror(error) << "\n"; break;case WFT_STATE_DNS_ERROR: // DNS错误cerr <<"DNS error: " << gai_strerror(error) << "\n"; break;case WFT_STATE_SSL_ERROR: // SSL错误cerr <<"SSL error: " << error << "\n"; break;case WFT_STATE_TASK_ERROR: // 任务错误cerr <<"Task error: "<< error << "\n"; break;case WFT_STATE_SUCCESS: // 请求成功break; }//错误处理if (state != WFT_STATE_SUCCESS) {cerr <<"Failed. Press Ctrl-C to exit.\n";return;}cout<<"callback is end\n";
}//------------------------------------//
// main()
//------------------------------------//
int main(void)
{signal(SIGINT,sighandler);WFRedisTask* redistask=WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);cerr<<"\n";//找到修改请求protocol::RedisRequest *req=redistask->get_req();req->set_request("SET",{"huasheng","lovexixi"});redistask->start();waitGroup.wait();cerr<<"\nfinish\n";return 0;
}
06_redistask_read.cc
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>#include <signal.h>
#include <iostream>using std::cout;
using std::cerr;//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);void sighandler(int signum){waitGroup.done();//waitGroup(--num);cout<<"done\n";
}//------------------------------------//
// redisCallback()
//------------------------------------//
void redisCallback(WFRedisTask* redistask){cout<<"callback is called\n";protocol::RedisRequest *req=redistask->get_req();protocol::RedisResponse* resp=redistask->get_resp();int state=redistask->get_state();int error=redistask->get_error();protocol::RedisValue val;//状态处理switch(state) {case WFT_STATE_SYS_ERROR: // 系统错误cerr <<"system error: " << strerror(error) << "\n"; break;case WFT_STATE_DNS_ERROR: // DNS错误cerr <<"DNS error: " << gai_strerror(error) << "\n"; break;case WFT_STATE_SSL_ERROR: // SSL错误cerr <<"SSL error: " << error << "\n"; break;case WFT_STATE_TASK_ERROR: // 任务错误cerr <<"Task error: "<< error << "\n"; break;case WFT_STATE_SUCCESS: // 请求成功resp->get_result(val);if(val.is_error()){cerr<<"error reply,need a password? \n";state=WFT_STATE_TASK_ERROR;}break; }//错误处理if (state != WFT_STATE_SUCCESS) {cerr <<"Failed. Press Ctrl-C to exit.\n";return;}//------------------------------------////查看redis执行的结果/* protocol::RedisValue val; *//* resp->get_result(val); */if(val.is_string()){cerr<<"value is string: "<<val.string_value()<<"\n";}if(val.is_array()){cerr<<"value is array: \n ";for(size_t i=0;i<val.arr_size();++i){cerr<<i<<" value:"<<val.arr_at(i).string_value()<<"\n";}}cout<<"callback is end\n";
}//------------------------------------//
// main()
//------------------------------------//
int main(void)
{signal(SIGINT,sighandler);WFRedisTask* redistask=WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);cerr<<"\n";//找到修改请求protocol::RedisRequest *req=redistask->get_req();/* req->set_request("SET",{"huasheng","lovexixi"}); */req->set_request("HGETALL",{"aa"});redistask->start();waitGroup.wait();cerr<<"\nfinish\n";return 0;
}
07_series.cc
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/Workflow.h>#include <signal.h>
#include <unistd.h>
#include <iostream>using std::cout;
using std::cerr;//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);void sighandler(int signum){waitGroup.done();//waitGroup(--num);cout<<"done\n";
}//------------------------------------//
// redisCallback()
//------------------------------------//
void redisCallback(WFRedisTask* redistask){cout<<"callback is called\n";protocol::RedisRequest *req=redistask->get_req();protocol::RedisResponse* resp=redistask->get_resp();int state=redistask->get_state();int error=redistask->get_error();protocol::RedisValue val;//状态处理switch(state) {case WFT_STATE_SYS_ERROR: // 系统错误cerr <<"system error: " << strerror(error) << "\n"; break;case WFT_STATE_DNS_ERROR: // DNS错误cerr <<"DNS error: " << gai_strerror(error) << "\n"; break;case WFT_STATE_SSL_ERROR: // SSL错误cerr <<"SSL error: " << error << "\n"; break;case WFT_STATE_TASK_ERROR: // 任务错误cerr <<"Task error: "<< error << "\n"; break;case WFT_STATE_SUCCESS: // 请求成功resp->get_result(val);if(val.is_error()){cerr<<"error reply,need a password? \n";state=WFT_STATE_TASK_ERROR;}break; }//错误处理if (state != WFT_STATE_SUCCESS) {cerr <<"Failed. Press Ctrl-C to exit.\n";return;}//------------------------------------////查看redis执行的结果/* protocol::RedisValue val; *//* resp->get_result(val); */if(val.is_string()){cerr<<"value is string: "<<val.string_value()<<"\n";}if(val.is_array()){cerr<<"value is array: \n ";for(size_t i=0;i<val.arr_size();++i){cerr<<i<<" value:"<<val.arr_at(i).string_value()<<"\n";}}/* sleep(2); */cout<<"callback is end\n";
}//------------------------------------//
// main()
//------------------------------------//
int main(void)
{signal(SIGINT,sighandler);WFRedisTask* redistask=WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);protocol::RedisRequest *req=redistask->get_req();req->set_request("SET",{"07dada","07lovexixi"});WFRedisTask* redistask1=WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);protocol::RedisRequest *req1=redistask1->get_req();req1->set_request("GET",{"07dada"});/* redistask->start(); *//* redistask1->start(); *///没有固定先后顺序SeriesWork* series=Workflow::create_series_work(redistask,nullptr);series->push_back(redistask1);series->start();waitGroup.wait();cerr<<"\nfinish\n";return 0;
}
08_series_dynamic.cc
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/Workflow.h>#include <signal.h>
#include <unistd.h>
#include <iostream>using std::cout;
using std::cerr;//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);void sighandler(int signum){waitGroup.done();//waitGroup(--num);cout<<"done\n";
}//------------------------------------//
// redisCallback()
//------------------------------------//
void redisCallback1(WFRedisTask* redistask){cout<<"callback111\n";
}void redisCallback(WFRedisTask* redistask){cout<<"callback is called\n";protocol::RedisRequest *req=redistask->get_req();protocol::RedisResponse* resp=redistask->get_resp();int state=redistask->get_state();int error=redistask->get_error();protocol::RedisValue val;//状态处理switch(state) {case WFT_STATE_SYS_ERROR: // 系统错误cerr <<"system error: " << strerror(error) << "\n"; break;case WFT_STATE_DNS_ERROR: // DNS错误cerr <<"DNS error: " << gai_strerror(error) << "\n"; break;case WFT_STATE_SSL_ERROR: // SSL错误cerr <<"SSL error: " << error << "\n"; break;case WFT_STATE_TASK_ERROR: // 任务错误cerr <<"Task error: "<< error << "\n"; break;case WFT_STATE_SUCCESS: // 请求成功resp->get_result(val);if(val.is_error()){cerr<<"error reply,need a password? \n";state=WFT_STATE_TASK_ERROR;}break; }//错误处理if (state != WFT_STATE_SUCCESS) {cerr <<"Failed. Press Ctrl-C to exit.\n";return;}//------------------------------------//if(val.is_string()){cerr<<"value is string: "<<val.string_value()<<"\n";//在正在执行的任务队列中添加任务WFRedisTask* redistask1=WFTaskFactory::create_redis_task(/* "redis://127.0.0.1:6379",10,redisCallback); */"redis://127.0.0.1:6379",10,redisCallback1);//递归调用redistask1->get_req()->set_request("SET",{"07dada","07lovexixi"});series_of(redistask)->push_back(redistask1);}if(val.is_array()){cerr<<"value is array: \n ";for(size_t i=0;i<val.arr_size();++i){cerr<<i<<" value:"<<val.arr_at(i).string_value()<<"\n";}}/* sleep(2); */cout<<"callback is end\n";
}//------------------------------------//
// main()
//------------------------------------//
int main(void)
{signal(SIGINT,sighandler);WFRedisTask* redistask1=WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);protocol::RedisRequest *req1=redistask1->get_req();req1->set_request("GET",{"07dada"});/* redistask->start(); */redistask1->start();//没有固定先后顺序/* SeriesWork* series=Workflow::create_series_work(redistask,nullptr); *//* series->push_back(redistask1); *//* series->start(); */waitGroup.wait();cerr<<"\nfinish\n";return 0;
}
09_context.cc
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/Workflow.h>#include <signal.h>
#include <unistd.h>
#include <iostream>using std::cout;
using std::cerr;
using std::string;
using std::vector;//------------------------------------//
struct SeriesContext{int id;std::string name;
};
static WFFacilities::WaitGroup waitGroup(1);void sighandler(int signum){waitGroup.done();//waitGroup(--num);cout<<"done\n";
}//------------------------------------//
// redisCallback()
//------------------------------------//
void redisCallback1(WFRedisTask *redisTask){cerr << "xixi 1 begin\n";SeriesContext * context = static_cast<SeriesContext *>(series_of(redisTask)->get_context());cerr << "before id = " << context->id << " name = " << context->name << "\n";context->id = 1001;context->name = "task1";cerr << "after id = " << context->id << " name = " << context->name << "\n";cerr << "xixi 1 end!\n";
}
void redisCallback2(WFRedisTask *redisTask){cerr << "xixi 2 begin\n";SeriesContext * context = static_cast<SeriesContext *>(series_of(redisTask)->get_context());cerr << "before id = " << context->id << " name = " << context->name << "\n";context->id = 1002;context->name = "task2";cerr << "after id = " << context->id << " name = " << context->name << "\n";cerr << "xixi 2 end!\n";
}
void redisCallback3(WFRedisTask *redisTask){cerr << "xixi 3 begin\n";SeriesContext * context = static_cast<SeriesContext *>(series_of(redisTask)->get_context());cerr << "before id = " << context->id << " name = " << context->name << "\n";cerr << "xixi 3 end!\n";
}void Callback(const SeriesWork* series){SeriesContext* context=static_cast<SeriesContext*>(series->get_context());cerr<<"callback id ="<<context->id<<" name="<<context->name<<"\n";delete context;
}//------------------------------------//
// main()
//------------------------------------//
int main(void)
{signal(SIGINT,sighandler);WFRedisTask * redisTask1 = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback1);redisTask1->get_req()->set_request("SET",{"key","123"});WFRedisTask * redisTask2 = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2);redisTask2->get_req()->set_request("SET",{"key","123"});WFRedisTask * redisTask3 = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback3);redisTask3->get_req()->set_request("SET",{"key","123"});SeriesWork* series=Workflow::create_series_work(redisTask1,nullptr);series->push_back(redisTask2);/* series->push_back(redisTask3); */SeriesContext* context=new SeriesContext({1000,"mian"});series->set_context(context);series->set_callback([context](const SeriesWork*series){cerr<<"callback id ="<<context->id<<" name="<<context->name<<"\n";delete context;});series->start();waitGroup.wait();cerr<<"\nfinish\n";return 0;
}
10_parallel_work.cc
// wait_group 实现有条件的等待
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/Workflow.h>
#include <iostream>
#include <signal.h>
using std::cout;
using std::cerr;
using std::vector;
using std::string;
struct SeriesContext{string url;size_t body_len;
};
static WFFacilities::WaitGroup waitGroup(1);
void handler(int signum){cout << "done\n";waitGroup.done();
}void httpCallback(WFHttpTask *httpTask){protocol::HttpResponse *resp = httpTask->get_resp(); // 获取响应int state = httpTask->get_state(); // 获取状态int error = httpTask->get_error(); // 获取错误原因switch (state){case WFT_STATE_SYS_ERROR:cerr <<"system error: " << strerror(error) << "\n";break;case WFT_STATE_DNS_ERROR:cerr <<"DNS error: " << gai_strerror(error) << "\n";break;case WFT_STATE_SSL_ERROR:cerr <<"SSL error\n";break;case WFT_STATE_TASK_ERROR:cerr <<"Task error\n";break;case WFT_STATE_SUCCESS:break;}if (state != WFT_STATE_SUCCESS){cerr <<"Failed. Press Ctrl-C to exit.\n";return;}const void *body;size_t body_len;resp->get_parsed_body(&body, &body_len); // get_parsed_body找到响应报文的报文体SeriesContext * context = static_cast<SeriesContext *>(series_of(httpTask)->get_context());context->body_len = body_len;cerr << "url = " << context->url << ", len = " << context->body_len << "\n";
}
void parallelCallback(const ParallelWork * parallelWork){cerr << "parallel callback\n";string name;size_t body_len = 0;for(int i = 0; i < 3; ++i){// 找到内部的(已经执行完成不可修改的)序列const SeriesWork * series = parallelWork->series_at(i);SeriesContext * context = static_cast<SeriesContext *>(series->get_context());cerr << "i = " << i << "url = " << context->url << ", len = " << context->body_len << "\n";if(body_len < context->body_len){body_len = context->body_len;name = context->url;}delete context;}cerr << "longest body_len url = " << name << " body_len = " << body_len <<"\n";WFRedisTask * redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,nullptr);redisTask->get_req()->set_request("SET",{name,std::to_string(body_len)});series_of(parallelWork)->push_back(redisTask);
}
int main(){signal(SIGINT,handler);// 创建一个空的并行任务ParallelWork * parallelWork = Workflow::create_parallel_work(parallelCallback);// 创建多个小序列vector<string> urls = {"http://www.taobao.com","http://www.jd.com","http://www.baidu.com"};for(int i = 0; i < 3; ++i){// 创建一个http任务WFHttpTask * httpTask = WFTaskFactory::create_http_task(urls[i],10,10,httpCallback);// 根据http任务创建小序列SeriesWork * series = Workflow::create_series_work(httpTask,nullptr);// 往序列中加一个contextSeriesContext *context = new SeriesContext;context->url = urls[i];series->set_context(context);// 把小序列加入并行任务parallelWork->add_series(series);}parallelWork->start();waitGroup.wait();cout << "finish!\n";
}
作业:
01 存在下列的redis键值映射关系,使用workflow的redis任务和序列,假如只知道"x1",如何找到最终的"100"?"x1" --> "x2""x2" --> "x3""x3" --> "x4""x4" --> "100"
// wait_group 实现有条件的等待 #include <workflow/WFFacilities.h> #include <workflow/WFTaskFactory.h>#include <iostream> #include <signal.h> using std::cout; using std::cerr; static WFFacilities::WaitGroup waitGroup(1); void handler(int signum){cout << "done\n";waitGroup.done(); } void redisCallback(WFRedisTask *redisTask) {protocol::RedisRequest *req = redisTask->get_req();protocol::RedisResponse *resp = redisTask->get_resp();int state = redisTask->get_state();int error = redisTask->get_error();// val用来保存redis执行的结果protocol::RedisValue val;switch (state){case WFT_STATE_SYS_ERROR:cerr <<"system error: " << strerror(error) << "\n";break;case WFT_STATE_DNS_ERROR:cerr <<"DNS error: " << gai_strerror(error) << "\n";break;case WFT_STATE_SSL_ERROR:cerr <<"SSL error\n";break;case WFT_STATE_TASK_ERROR:cerr <<"Task error\n";break;case WFT_STATE_SUCCESS:resp->get_result(val);// 将redis的执行结果保存起来if (val.is_error()){cerr << "Error reply. Need a password?\n";state = WFT_STATE_TASK_ERROR;}break;}if (state != WFT_STATE_SUCCESS){cerr << "Failed. Press Ctrl-C to exit.\n";return;}if(val.is_string()&& val.string_value()!="100"){cerr<<"100 is not found"<<val.string_value()<<"\n";WFRedisTask* newtask=WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);newtask->get_req()->set_request("GET",{val.string_value()});series_of(redisTask)->push_back(newtask);}else{cerr<<"100 is found\n";}}int main(){signal(SIGINT,handler);// 创建任务WFRedisTask * redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);// 找到请求protocol::RedisRequest * req = redisTask->get_req();req->set_request("GET", {"x1"});// 将任务交给框架redisTask->start();waitGroup.wait();cout << "finish!\n"; }
02 读取某个网站的内容,并且存入redis服务端当中(比如先访问淘宝,再set www.taobao.com 淘宝的html内容)
// wait_group 实现有条件的等待
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/Workflow.h>
#include <iostream>
#include <signal.h>
using std::cout;
using std::cerr;
using std::vector;
using std::string;
struct SeriesContext{string url;size_t body_len;
};
static WFFacilities::WaitGroup waitGroup(1);
void handler(int signum){cout << "done\n";waitGroup.done();
}void httpCallback(WFHttpTask *httpTask){protocol::HttpResponse *resp = httpTask->get_resp(); // 获取响应int state = httpTask->get_state(); // 获取状态int error = httpTask->get_error(); // 获取错误原因switch (state){case WFT_STATE_SYS_ERROR:cerr <<"system error: " << strerror(error) << "\n";break;case WFT_STATE_DNS_ERROR:cerr <<"DNS error: " << gai_strerror(error) << "\n";break;case WFT_STATE_SSL_ERROR:cerr <<"SSL error\n";break;case WFT_STATE_TASK_ERROR:cerr <<"Task error\n";break;case WFT_STATE_SUCCESS:break;}if (state != WFT_STATE_SUCCESS){cerr <<"Failed. Press Ctrl-C to exit.\n";return;}const void *body;size_t body_len;resp->get_parsed_body(&body, &body_len); // get_parsed_body找到响应报文的报文体//创建一个redis任务WFRedisTask* redistask=WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,nullptr);redistask->get_req()->set_request("SET",{"http://www.baidu.com",static_cast<const char*>(body)});series_of(httpTask)->push_back(redistask);
}int main(){signal(SIGINT,handler);WFHttpTask * httpTask = WFTaskFactory::create_http_task("http://www.baidu.com",10,10,httpCallback);/* SeriesWork * series = Workflow::create_series_work(httpTask,nullptr); */httpTask->start();waitGroup.wait();cout << "finish!\n";
}
03 阅读下面的代码并尝试添加注释
#include <workflow/WFFacilities.h>
#include <workflow/MySQLUtil.h>
#include <workflow/MySQLResult.h>
#include <iostream>
#include <signal.h>
using std::string;
using std::cerr;
static WFFacilities::WaitGroup waitGroup(1);
void sighandler(int signum){std::cout << "signum = " << signum << "\n";waitGroup.done();
}
void mysqlCallback(WFMySQLTask * mysqlTask){if(mysqlTask->get_state() != WFT_STATE_SUCCESS){// 在系统层面报错,权限or密码cerr << "error_msg = " << WFGlobal::get_error_string(mysqlTask->get_state(), mysqlTask->get_error()) << "\n";return;}protocol::MySQLResponse * resp = mysqlTask->get_resp();if(resp->get_packet_type() == MYSQL_PACKET_ERROR){// 在SQL语句报错cerr << "error_code = " << resp->get_error_code() << " error_msg = " << resp->get_error_msg() << "\n";return;}protocol::MySQLResultCursor cursor(resp);do{if(cursor.get_cursor_status() == MYSQL_STATUS_OK){// 写类型的SQL语句cerr << "write \n";cerr << cursor.get_affected_rows() << " rows affected\n";}else if(cursor.get_cursor_status() == MYSQL_STATUS_GET_RESULT){// 读类型的SQL语句cerr << "read \n";// 读表头 列的信息 fieldconst protocol::MySQLField * const * fieldArr;fieldArr = cursor.fetch_fields();for(int i = 0; i < cursor.get_field_count(); ++i){cerr << "db = " << fieldArr[i]->get_db()<< " table = " << fieldArr[i]->get_table()<< " name = " << fieldArr[i]->get_name()<< " type = " << datatype2str(fieldArr[i]->get_data_type()) << "\n";}// 读表的内容 每一行每一列// bool fetch_all(std::vector<std::vector<MySQLCell>>& rows);std::vector<std::vector<protocol::MySQLCell>> rows;cursor.fetch_all(rows);for(auto &row:rows){for(auto &cell:row){if(cell.is_int()){cerr << cell.as_int();}else if(cell.is_string()){cerr << cell.as_string();}else if(cell.is_datetime()){cerr << cell.as_datetime();}cerr << "\t";}cerr << "\n";} }}while(cursor.next_result_set()); //mysql 任务支持一个任务处理多个SQL语句
}
int main(){signal(SIGINT,sighandler);WFMySQLTask * mysqlTask = WFTaskFactory::create_mysql_task("mysql://root:123@localhost",1,mysqlCallback);string sql = "insert into mycloud.tbl_user_token (user_name,user_token) values ('Caixukun','singdancerap');";//string sql;sql += "select * from mycloud.tbl_user_token;";mysqlTask->get_req()->set_query(sql);mysqlTask->start();waitGroup.wait();return 0;
}
#include <workflow/WFFacilities.h> // 包含 WFFacilities 库
#include <workflow/MySQLUtil.h> // 包含 MySQL 相关工具库
#include <workflow/MySQLResult.h> // 包含 MySQL 结果处理库
#include <iostream> // 包含输入输出流库
#include <signal.h> // 包含信号处理库using std::string; // 使用 string 类型
using std::cerr; // 使用 cerr 输出错误信息// 创建一个 WaitGroup 对象,用于同步
static WFFacilities::WaitGroup waitGroup(1);// 定义信号处理函数,用于处理 SIGINT 信号
void sighandler(int signum){std::cout << "signum = " << signum << "\n"; // 输出接收到的信号编号waitGroup.done(); // 完成 WaitGroup 的工作,结束程序
}// MySQL 任务的回调函数
void mysqlCallback(WFMySQLTask * mysqlTask){// 检查任务状态是否成功if(mysqlTask->get_state() != WFT_STATE_SUCCESS){// 输出系统级错误信息,如权限或密码错误cerr << "error_msg = " << WFGlobal::get_error_string(mysqlTask->get_state(), mysqlTask->get_error()) << "\n";return;}// 获取 MySQL 响应对象protocol::MySQLResponse * resp = mysqlTask->get_resp();// 检查返回的数据包类型是否为错误类型if(resp->get_packet_type() == MYSQL_PACKET_ERROR){// 输出 SQL 语句执行中的错误码和错误信息cerr << "error_code = " << resp->get_error_code() << " error_msg = " << resp->get_error_msg() << "\n";return;}// 创建 MySQL 结果游标对象protocol::MySQLResultCursor cursor(resp);// 循环处理结果集do{// 检查游标状态if(cursor.get_cursor_status() == MYSQL_STATUS_OK){// 如果是写操作,输出受影响的行数cerr << "write \n";cerr << cursor.get_affected_rows() << " rows affected\n";}else if(cursor.get_cursor_status() == MYSQL_STATUS_GET_RESULT){// 如果是读操作cerr << "read \n";// 读取字段信息const protocol::MySQLField * const * fieldArr;fieldArr = cursor.fetch_fields();for(int i = 0; i < cursor.get_field_count(); ++i){// 输出数据库、表名、字段名及其数据类型cerr << "db = " << fieldArr[i]->get_db()<< " table = " << fieldArr[i]->get_table()<< " name = " << fieldArr[i]->get_name()<< " type = " << datatype2str(fieldArr[i]->get_data_type()) << "\n";}// 读取表的所有内容std::vector<std::vector<protocol::MySQLCell>> rows;cursor.fetch_all(rows); // 获取所有行// 遍历每一行和每一个单元格for(auto &row : rows){for(auto &cell : row){// 根据单元格的数据类型输出相应的内容if(cell.is_int()){cerr << cell.as_int();}else if(cell.is_string()){cerr << cell.as_string();}else if(cell.is_datetime()){cerr << cell.as_datetime();}cerr << "\t"; // 输出制表符}cerr << "\n"; // 换行} }} while(cursor.next_result_set()); // 支持一个任务处理多个 SQL 语句
}int main(){signal(SIGINT, sighandler); // 注册 SIGINT 信号处理函数// 创建 MySQL 任务并指定连接字符串和回调函数WFMySQLTask * mysqlTask = WFTaskFactory::create_mysql_task("mysql://root:123@localhost", 1, mysqlCallback);// 构造 SQL 查询语句string sql = "insert into mycloud.tbl_user_token (user_name, user_token) values ('Caixukun', 'singdancerap');";sql += "select * from mycloud.tbl_user_token;"; // 在插入后选择用户令牌// 设置 MySQL 任务的查询mysqlTask->get_req()->set_query(sql);// 启动 MySQL 任务mysqlTask->start();// 等待 WaitGroup 完成waitGroup.wait();return 0; // 返回 0 表示程序正常结束
}