1004-05,使用workflow对象创建http任务,redis任务

目录

01_wait_group.cc

02_http_wait.cc

03_httptask_callback.cc

04_http_task_request.cc

05_redis_task_callback.cc

06_redistask_read.cc

07_series.cc

08_series_dynamic.cc

09_context.cc

10_parallel_work.cc

作业:

01 存在下列的redis键值映射关系,使用workflow的redis任务和序列,假如只知道"x1",如何找到最终的"100"?"x1" --> "x2""x2" --> "x3""x3" --> "x4""x4" --> "100"

02 读取某个网站的内容,并且存入redis服务端当中(比如先访问淘宝,再set www.taobao.com 淘宝的html内容)

03 阅读下面的代码并尝试添加注释

01_wait_group.cc

#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>#include <signal.h>
#include <iostream>using std::cout;
using std::cerr;static WFFacilities::WaitGroup WaitGroup(1);void handler(int signum){cout<<"done\n";WaitGroup.done();
}int main(void)
{signal(SIGINT,handler);//创建任务WFHttpTask * httpTask=WFTaskFactory::create_http_task(/* "http://www.baidu.com", */
"http://localhost/en/index.html",10,10,nullptr);//交给框架执行httpTask->start();WaitGroup.wait();cout<<"finish\n";return 0;
}/* static WFHttpTask *create_http_task(const std::string& url, *//* 									int redirect_max,//最大重定向次数 *//* 									int retry_max, //最大重试次数*//* 									http_callback_t callback//回调函数); */

02_http_wait.cc

// wait_group 实现有条件的等待
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <iostream>
#include <signal.h>
using std::cout;
using std::cerr;
static WFFacilities::WaitGroup waitGroup(1);
void handler(int signum){cout << "done\n";waitGroup.done();
}
int main(){signal(SIGINT,handler);// 用户代码 1 创建任务WFHttpTask * httpTask =  WFTaskFactory::create_http_task(//"http://www.baidu.com","http://localhost/en/index.html",10,10,nullptr);// 用户代码 2 把任务交给框架httpTask->start();waitGroup.wait();cout << "finish!\n";
}

03_httptask_callback.cc

#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/HttpUtil.h>#include <signal.h>
#include <iostream>using std::cout;
using std::cerr;//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);void sighandler(int signum){waitGroup.done();//waitGroup(--num);cout<<"done\n";
}//------------------------------------//
//  httpCallback()
//------------------------------------//
void httpCallback(WFHttpTask* httpTask){cout<<"callback is called\n";protocol::HttpRequest* req=httpTask->get_req();protocol::HttpResponse* resq=httpTask->get_resp();int state=httpTask->get_state();int error=httpTask->get_error();//错误原因//状态处理switch(state) {case WFT_STATE_SYS_ERROR: // 系统错误cerr <<"system error: " << strerror(error) << "\n"; break;case WFT_STATE_DNS_ERROR: // DNS错误cerr <<"DNS error: " << gai_strerror(error) << "\n"; break;case WFT_STATE_SSL_ERROR: // SSL错误cerr <<"SSL error: " << error << "\n"; break;case WFT_STATE_TASK_ERROR: // 任务错误cerr <<"Task error: "<< error << "\n"; break;case WFT_STATE_SUCCESS: // 请求成功break; }//错误处理if (state != WFT_STATE_SUCCESS) {cerr <<"Failed. Press Ctrl-C to exit.\n";return;}//------------------------------------////请求报文-起始行-首部字段cerr<<"method="<<req->get_method()<<"\n";cerr<<"version="<<req->get_http_version()<<"\n";cerr<<"path& query="<<req->get_request_uri()<<"\n";std::string name;std::string value;protocol::HttpHeaderCursor req_cursor(req);//类似迭代器,初始化while(req_cursor.next(name,value)){cerr<<"name ="<<name<<"  value ="<<value<<"\n";}cerr<<"\n";//------------------------------------////响应报文-起始行-首部字段-报文体cerr<<"version="<<resq->get_http_version()<<"\n";cerr<<"state code="<<resq->get_status_code()<<"\n";cerr<<"reason phrea="<<resq->get_reason_phrase()<<"\n";protocol::HttpHeaderCursor resp_cursor(resq);//类似迭代器,初始化while(resp_cursor.next(name,value)){cerr<<"name ="<<name<<"  value ="<<value<<"\n";}const void * body;size_t body_len;resq->get_parsed_body(&body,&body_len);cerr<<static_cast<const char*>(body)<<"\n";}//------------------------------------//
//  main()
//------------------------------------//
int main(void)
{signal(SIGINT,sighandler);WFHttpTask* httpTask=WFTaskFactory::create_http_task("http://www.taobao.com",10,10,httpCallback);httpTask->start();waitGroup.wait();cerr<<"\nfinish\n";return 0;
}

04_http_task_request.cc

#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/HttpMessage.h>
#include <workflow/HttpUtil.h>#include <signal.h>
#include <iostream>using std::cout;
using std::cerr;//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);void sighandler(int signum){waitGroup.done();//waitGroup(--num);cout<<"done\n";
}//------------------------------------//
//  httpCallback()
//------------------------------------//
void httpCallback(WFHttpTask* httpTask){cout<<"callback is called\n";protocol::HttpRequest* req=httpTask->get_req();protocol::HttpResponse* resq=httpTask->get_resp();int state=httpTask->get_state();int error=httpTask->get_error();//错误原因//状态处理switch(state) {case WFT_STATE_SYS_ERROR: // 系统错误cerr <<"system error: " << strerror(error) << "\n"; break;case WFT_STATE_DNS_ERROR: // DNS错误cerr <<"DNS error: " << gai_strerror(error) << "\n"; break;case WFT_STATE_SSL_ERROR: // SSL错误cerr <<"SSL error: " << error << "\n"; break;case WFT_STATE_TASK_ERROR: // 任务错误cerr <<"Task error: "<< error << "\n"; break;case WFT_STATE_SUCCESS: // 请求成功break; }//错误处理if (state != WFT_STATE_SUCCESS) {cerr <<"Failed. Press Ctrl-C to exit.\n";return;}//------------------------------------////请求报文-起始行-首部字段cerr<<"method="<<req->get_method()<<"\n";cerr<<"version="<<req->get_http_version()<<"\n";cerr<<"path& query="<<req->get_request_uri()<<"\n";std::string name;std::string value;protocol::HttpHeaderCursor req_cursor(req);//类似迭代器,初始化while(req_cursor.next(name,value)){cerr<<"name ="<<name<<"  value ="<<value<<"\n";}cerr<<"\n";//------------------------------------////响应报文-起始行-首部字段-报文体cerr<<"version="<<resq->get_http_version()<<"\n";cerr<<"state code="<<resq->get_status_code()<<"\n";cerr<<"reason phrea="<<resq->get_reason_phrase()<<"\n";protocol::HttpHeaderCursor resp_cursor(resq);//类似迭代器,初始化while(resp_cursor.next(name,value)){cerr<<"name ="<<name<<"  value ="<<value<<"\n";}/* const void * body; *//* size_t body_len; *//* resq->get_parsed_body(&body,&body_len); *//* cerr<<static_cast<const char*>(body)<<"\n"; *//* GET: 通常用于请求数据,不会对服务器的状态产生副作用。 */
/* POST: 用于提交数据,通常会导致服务器状态改变或者创建新的资源。 */
/* 百度的接口: *//* 百度的搜索接口一般是通过 GET 请求来获取搜索结果。因此,即使您将方法设置为 POST,如果服务器只支持 GET,请求仍然会被处理为 GET。 */}//------------------------------------//
//  main()
//------------------------------------//
int main(void)
{signal(SIGINT,sighandler);WFHttpTask* httpTask=WFTaskFactory::create_http_task("http://www.taobao.com",10,10,httpCallback);//找到请求并设置残数protocol::HttpRequest* req=httpTask->get_req();req->set_method("POST");req->set_request_uri("/s?wd=123");//百度查询接口req->add_header_pair("myname","workflow");cerr<<"method="<<req->get_method()<<"\n";cerr<<"version="<<req->get_http_version()<<"\n";cerr<<"path& query="<<req->get_request_uri()<<"\n";cerr<<"\n";cerr<<"\n";httpTask->start();waitGroup.wait();cerr<<"\nfinish\n";return 0;
}

05_redis_task_callback.cc

#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>#include <signal.h>
#include <iostream>using std::cout;
using std::cerr;//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);void sighandler(int signum){waitGroup.done();//waitGroup(--num);cout<<"done\n";
}//------------------------------------//
//  redisCallback()
//------------------------------------//
void redisCallback(WFRedisTask* redistask){cout<<"callback is called\n";protocol::RedisRequest *req=redistask->get_req();protocol::RedisResponse* resp=redistask->get_resp();int state=redistask->get_state();int error=redistask->get_error();//状态处理switch(state) {case WFT_STATE_SYS_ERROR: // 系统错误cerr <<"system error: " << strerror(error) << "\n"; break;case WFT_STATE_DNS_ERROR: // DNS错误cerr <<"DNS error: " << gai_strerror(error) << "\n"; break;case WFT_STATE_SSL_ERROR: // SSL错误cerr <<"SSL error: " << error << "\n"; break;case WFT_STATE_TASK_ERROR: // 任务错误cerr <<"Task error: "<< error << "\n"; break;case WFT_STATE_SUCCESS: // 请求成功break; }//错误处理if (state != WFT_STATE_SUCCESS) {cerr <<"Failed. Press Ctrl-C to exit.\n";return;}cout<<"callback is end\n";
}//------------------------------------//
//  main()
//------------------------------------//
int main(void)
{signal(SIGINT,sighandler);WFRedisTask* redistask=WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);cerr<<"\n";//找到修改请求protocol::RedisRequest *req=redistask->get_req();req->set_request("SET",{"huasheng","lovexixi"});redistask->start();waitGroup.wait();cerr<<"\nfinish\n";return 0;
}

06_redistask_read.cc

#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>#include <signal.h>
#include <iostream>using std::cout;
using std::cerr;//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);void sighandler(int signum){waitGroup.done();//waitGroup(--num);cout<<"done\n";
}//------------------------------------//
//  redisCallback()
//------------------------------------//
void redisCallback(WFRedisTask* redistask){cout<<"callback is called\n";protocol::RedisRequest *req=redistask->get_req();protocol::RedisResponse* resp=redistask->get_resp();int state=redistask->get_state();int error=redistask->get_error();protocol::RedisValue val;//状态处理switch(state) {case WFT_STATE_SYS_ERROR: // 系统错误cerr <<"system error: " << strerror(error) << "\n"; break;case WFT_STATE_DNS_ERROR: // DNS错误cerr <<"DNS error: " << gai_strerror(error) << "\n"; break;case WFT_STATE_SSL_ERROR: // SSL错误cerr <<"SSL error: " << error << "\n"; break;case WFT_STATE_TASK_ERROR: // 任务错误cerr <<"Task error: "<< error << "\n"; break;case WFT_STATE_SUCCESS: // 请求成功resp->get_result(val);if(val.is_error()){cerr<<"error reply,need a password? \n";state=WFT_STATE_TASK_ERROR;}break; }//错误处理if (state != WFT_STATE_SUCCESS) {cerr <<"Failed. Press Ctrl-C to exit.\n";return;}//------------------------------------////查看redis执行的结果/* protocol::RedisValue val; *//* resp->get_result(val); */if(val.is_string()){cerr<<"value is string:  "<<val.string_value()<<"\n";}if(val.is_array()){cerr<<"value is array: \n ";for(size_t i=0;i<val.arr_size();++i){cerr<<i<<"  value:"<<val.arr_at(i).string_value()<<"\n";}}cout<<"callback is end\n";
}//------------------------------------//
//  main()
//------------------------------------//
int main(void)
{signal(SIGINT,sighandler);WFRedisTask* redistask=WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);cerr<<"\n";//找到修改请求protocol::RedisRequest *req=redistask->get_req();/* req->set_request("SET",{"huasheng","lovexixi"}); */req->set_request("HGETALL",{"aa"});redistask->start();waitGroup.wait();cerr<<"\nfinish\n";return 0;
}

07_series.cc

#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/Workflow.h>#include <signal.h>
#include <unistd.h>
#include <iostream>using std::cout;
using std::cerr;//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);void sighandler(int signum){waitGroup.done();//waitGroup(--num);cout<<"done\n";
}//------------------------------------//
//  redisCallback()
//------------------------------------//
void redisCallback(WFRedisTask* redistask){cout<<"callback is called\n";protocol::RedisRequest *req=redistask->get_req();protocol::RedisResponse* resp=redistask->get_resp();int state=redistask->get_state();int error=redistask->get_error();protocol::RedisValue val;//状态处理switch(state) {case WFT_STATE_SYS_ERROR: // 系统错误cerr <<"system error: " << strerror(error) << "\n"; break;case WFT_STATE_DNS_ERROR: // DNS错误cerr <<"DNS error: " << gai_strerror(error) << "\n"; break;case WFT_STATE_SSL_ERROR: // SSL错误cerr <<"SSL error: " << error << "\n"; break;case WFT_STATE_TASK_ERROR: // 任务错误cerr <<"Task error: "<< error << "\n"; break;case WFT_STATE_SUCCESS: // 请求成功resp->get_result(val);if(val.is_error()){cerr<<"error reply,need a password? \n";state=WFT_STATE_TASK_ERROR;}break; }//错误处理if (state != WFT_STATE_SUCCESS) {cerr <<"Failed. Press Ctrl-C to exit.\n";return;}//------------------------------------////查看redis执行的结果/* protocol::RedisValue val; *//* resp->get_result(val); */if(val.is_string()){cerr<<"value is string:  "<<val.string_value()<<"\n";}if(val.is_array()){cerr<<"value is array: \n ";for(size_t i=0;i<val.arr_size();++i){cerr<<i<<"  value:"<<val.arr_at(i).string_value()<<"\n";}}/* sleep(2); */cout<<"callback is end\n";
}//------------------------------------//
//  main()
//------------------------------------//
int main(void)
{signal(SIGINT,sighandler);WFRedisTask* redistask=WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);protocol::RedisRequest *req=redistask->get_req();req->set_request("SET",{"07dada","07lovexixi"});WFRedisTask* redistask1=WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);protocol::RedisRequest *req1=redistask1->get_req();req1->set_request("GET",{"07dada"});/* redistask->start(); *//* redistask1->start(); *///没有固定先后顺序SeriesWork* series=Workflow::create_series_work(redistask,nullptr);series->push_back(redistask1);series->start();waitGroup.wait();cerr<<"\nfinish\n";return 0;
}

08_series_dynamic.cc

#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/Workflow.h>#include <signal.h>
#include <unistd.h>
#include <iostream>using std::cout;
using std::cerr;//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);void sighandler(int signum){waitGroup.done();//waitGroup(--num);cout<<"done\n";
}//------------------------------------//
//  redisCallback()
//------------------------------------//
void redisCallback1(WFRedisTask* redistask){cout<<"callback111\n";
}void redisCallback(WFRedisTask* redistask){cout<<"callback is called\n";protocol::RedisRequest *req=redistask->get_req();protocol::RedisResponse* resp=redistask->get_resp();int state=redistask->get_state();int error=redistask->get_error();protocol::RedisValue val;//状态处理switch(state) {case WFT_STATE_SYS_ERROR: // 系统错误cerr <<"system error: " << strerror(error) << "\n"; break;case WFT_STATE_DNS_ERROR: // DNS错误cerr <<"DNS error: " << gai_strerror(error) << "\n"; break;case WFT_STATE_SSL_ERROR: // SSL错误cerr <<"SSL error: " << error << "\n"; break;case WFT_STATE_TASK_ERROR: // 任务错误cerr <<"Task error: "<< error << "\n"; break;case WFT_STATE_SUCCESS: // 请求成功resp->get_result(val);if(val.is_error()){cerr<<"error reply,need a password? \n";state=WFT_STATE_TASK_ERROR;}break; }//错误处理if (state != WFT_STATE_SUCCESS) {cerr <<"Failed. Press Ctrl-C to exit.\n";return;}//------------------------------------//if(val.is_string()){cerr<<"value is string:  "<<val.string_value()<<"\n";//在正在执行的任务队列中添加任务WFRedisTask* redistask1=WFTaskFactory::create_redis_task(/* "redis://127.0.0.1:6379",10,redisCallback); */"redis://127.0.0.1:6379",10,redisCallback1);//递归调用redistask1->get_req()->set_request("SET",{"07dada","07lovexixi"});series_of(redistask)->push_back(redistask1);}if(val.is_array()){cerr<<"value is array: \n ";for(size_t i=0;i<val.arr_size();++i){cerr<<i<<"  value:"<<val.arr_at(i).string_value()<<"\n";}}/* sleep(2); */cout<<"callback is end\n";
}//------------------------------------//
//  main()
//------------------------------------//
int main(void)
{signal(SIGINT,sighandler);WFRedisTask* redistask1=WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);protocol::RedisRequest *req1=redistask1->get_req();req1->set_request("GET",{"07dada"});/* redistask->start(); */redistask1->start();//没有固定先后顺序/* SeriesWork* series=Workflow::create_series_work(redistask,nullptr); *//* series->push_back(redistask1); *//* series->start(); */waitGroup.wait();cerr<<"\nfinish\n";return 0;
}

09_context.cc

#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/Workflow.h>#include <signal.h>
#include <unistd.h>
#include <iostream>using std::cout;
using std::cerr;
using std::string;
using std::vector;//------------------------------------//
struct SeriesContext{int id;std::string name;
};
static WFFacilities::WaitGroup waitGroup(1);void sighandler(int signum){waitGroup.done();//waitGroup(--num);cout<<"done\n";
}//------------------------------------//
//  redisCallback()
//------------------------------------//
void redisCallback1(WFRedisTask *redisTask){cerr << "xixi 1 begin\n";SeriesContext * context = static_cast<SeriesContext *>(series_of(redisTask)->get_context());cerr << "before id = " << context->id << " name = " << context->name << "\n";context->id = 1001;context->name = "task1";cerr << "after id = " << context->id << " name = " << context->name << "\n";cerr << "xixi 1 end!\n";
}
void redisCallback2(WFRedisTask *redisTask){cerr << "xixi 2 begin\n";SeriesContext * context = static_cast<SeriesContext *>(series_of(redisTask)->get_context());cerr << "before id = " << context->id << " name = " << context->name << "\n";context->id = 1002;context->name = "task2";cerr << "after id = " << context->id << " name = " << context->name << "\n";cerr << "xixi 2 end!\n";
}
void redisCallback3(WFRedisTask *redisTask){cerr << "xixi 3 begin\n";SeriesContext * context = static_cast<SeriesContext *>(series_of(redisTask)->get_context());cerr << "before id = " << context->id << " name = " << context->name << "\n";cerr << "xixi 3 end!\n";
}void Callback(const SeriesWork* series){SeriesContext* context=static_cast<SeriesContext*>(series->get_context());cerr<<"callback id ="<<context->id<<"  name="<<context->name<<"\n";delete context;
}//------------------------------------//
//  main()
//------------------------------------//
int main(void)
{signal(SIGINT,sighandler);WFRedisTask * redisTask1 =  WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback1);redisTask1->get_req()->set_request("SET",{"key","123"});WFRedisTask * redisTask2 =  WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2);redisTask2->get_req()->set_request("SET",{"key","123"});WFRedisTask * redisTask3 =  WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback3);redisTask3->get_req()->set_request("SET",{"key","123"});SeriesWork* series=Workflow::create_series_work(redisTask1,nullptr);series->push_back(redisTask2);/* series->push_back(redisTask3); */SeriesContext* context=new SeriesContext({1000,"mian"});series->set_context(context);series->set_callback([context](const SeriesWork*series){cerr<<"callback id ="<<context->id<<"  name="<<context->name<<"\n";delete context;});series->start();waitGroup.wait();cerr<<"\nfinish\n";return 0;
}

10_parallel_work.cc

// wait_group 实现有条件的等待
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/Workflow.h>
#include <iostream>
#include <signal.h>
using std::cout;
using std::cerr;
using std::vector;
using std::string;
struct SeriesContext{string url;size_t body_len;
};
static WFFacilities::WaitGroup waitGroup(1);
void handler(int signum){cout << "done\n";waitGroup.done();
}void httpCallback(WFHttpTask *httpTask){protocol::HttpResponse *resp = httpTask->get_resp(); // 获取响应int state = httpTask->get_state(); // 获取状态int error = httpTask->get_error(); // 获取错误原因switch (state){case WFT_STATE_SYS_ERROR:cerr <<"system error: " << strerror(error) << "\n";break;case WFT_STATE_DNS_ERROR:cerr <<"DNS error: "  << gai_strerror(error) << "\n";break;case WFT_STATE_SSL_ERROR:cerr <<"SSL error\n";break;case WFT_STATE_TASK_ERROR:cerr <<"Task error\n";break;case WFT_STATE_SUCCESS:break;}if (state != WFT_STATE_SUCCESS){cerr <<"Failed. Press Ctrl-C to exit.\n";return;}const void *body;size_t body_len;resp->get_parsed_body(&body, &body_len); // get_parsed_body找到响应报文的报文体SeriesContext * context = static_cast<SeriesContext *>(series_of(httpTask)->get_context());context->body_len = body_len;cerr << "url = " << context->url << ", len = " << context->body_len << "\n";
}
void parallelCallback(const ParallelWork * parallelWork){cerr << "parallel callback\n";string name;size_t body_len = 0;for(int i = 0; i < 3; ++i){// 找到内部的(已经执行完成不可修改的)序列const SeriesWork * series = parallelWork->series_at(i);SeriesContext * context = static_cast<SeriesContext *>(series->get_context());cerr << "i = " << i << "url = " << context->url << ", len = " << context->body_len << "\n";if(body_len < context->body_len){body_len = context->body_len;name = context->url;}delete context;}cerr << "longest body_len  url = " << name << " body_len = " << body_len <<"\n";WFRedisTask * redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,nullptr);redisTask->get_req()->set_request("SET",{name,std::to_string(body_len)});series_of(parallelWork)->push_back(redisTask);
}
int main(){signal(SIGINT,handler);// 创建一个空的并行任务ParallelWork * parallelWork = Workflow::create_parallel_work(parallelCallback);// 创建多个小序列vector<string> urls = {"http://www.taobao.com","http://www.jd.com","http://www.baidu.com"};for(int i = 0; i < 3; ++i){// 创建一个http任务WFHttpTask * httpTask = WFTaskFactory::create_http_task(urls[i],10,10,httpCallback);// 根据http任务创建小序列SeriesWork * series =  Workflow::create_series_work(httpTask,nullptr);// 往序列中加一个contextSeriesContext *context = new SeriesContext;context->url = urls[i];series->set_context(context);// 把小序列加入并行任务parallelWork->add_series(series);}parallelWork->start();waitGroup.wait();cout << "finish!\n";
}

作业:

01 存在下列的redis键值映射关系,使用workflow的redis任务和序列,假如只知道"x1",如何找到最终的"100"?"x1" --> "x2""x2" --> "x3""x3" --> "x4""x4" --> "100"

// wait_group 实现有条件的等待
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>#include <iostream>
#include <signal.h>
using std::cout;
using std::cerr;
static WFFacilities::WaitGroup waitGroup(1);
void handler(int signum){cout << "done\n";waitGroup.done();
}
void redisCallback(WFRedisTask *redisTask)
{protocol::RedisRequest *req = redisTask->get_req();protocol::RedisResponse *resp = redisTask->get_resp();int state = redisTask->get_state();int error = redisTask->get_error();// val用来保存redis执行的结果protocol::RedisValue val;switch (state){case WFT_STATE_SYS_ERROR:cerr <<"system error: " << strerror(error) << "\n";break;case WFT_STATE_DNS_ERROR:cerr <<"DNS error: "  << gai_strerror(error) << "\n";break;case WFT_STATE_SSL_ERROR:cerr <<"SSL error\n";break;case WFT_STATE_TASK_ERROR:cerr <<"Task error\n";break;case WFT_STATE_SUCCESS:resp->get_result(val);// 将redis的执行结果保存起来if (val.is_error()){cerr <<  "Error reply. Need a password?\n";state = WFT_STATE_TASK_ERROR;}break;}if (state != WFT_STATE_SUCCESS){cerr <<  "Failed. Press Ctrl-C to exit.\n";return;}if(val.is_string()&& val.string_value()!="100"){cerr<<"100 is not found"<<val.string_value()<<"\n";WFRedisTask* newtask=WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);newtask->get_req()->set_request("GET",{val.string_value()});series_of(redisTask)->push_back(newtask);}else{cerr<<"100 is found\n";}}int main(){signal(SIGINT,handler);// 创建任务WFRedisTask * redisTask =  WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);// 找到请求protocol::RedisRequest * req =  redisTask->get_req();req->set_request("GET", {"x1"});// 将任务交给框架redisTask->start();waitGroup.wait();cout << "finish!\n";
}

02 读取某个网站的内容,并且存入redis服务端当中(比如先访问淘宝,再set www.taobao.com 淘宝的html内容)

// wait_group 实现有条件的等待
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/Workflow.h>
#include <iostream>
#include <signal.h>
using std::cout;
using std::cerr;
using std::vector;
using std::string;
struct SeriesContext{string url;size_t body_len;
};
static WFFacilities::WaitGroup waitGroup(1);
void handler(int signum){cout << "done\n";waitGroup.done();
}void httpCallback(WFHttpTask *httpTask){protocol::HttpResponse *resp = httpTask->get_resp(); // 获取响应int state = httpTask->get_state(); // 获取状态int error = httpTask->get_error(); // 获取错误原因switch (state){case WFT_STATE_SYS_ERROR:cerr <<"system error: " << strerror(error) << "\n";break;case WFT_STATE_DNS_ERROR:cerr <<"DNS error: "  << gai_strerror(error) << "\n";break;case WFT_STATE_SSL_ERROR:cerr <<"SSL error\n";break;case WFT_STATE_TASK_ERROR:cerr <<"Task error\n";break;case WFT_STATE_SUCCESS:break;}if (state != WFT_STATE_SUCCESS){cerr <<"Failed. Press Ctrl-C to exit.\n";return;}const void *body;size_t body_len;resp->get_parsed_body(&body, &body_len); // get_parsed_body找到响应报文的报文体//创建一个redis任务WFRedisTask* redistask=WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,nullptr);redistask->get_req()->set_request("SET",{"http://www.baidu.com",static_cast<const char*>(body)});series_of(httpTask)->push_back(redistask);
}int main(){signal(SIGINT,handler);WFHttpTask * httpTask = WFTaskFactory::create_http_task("http://www.baidu.com",10,10,httpCallback);/* SeriesWork * series =  Workflow::create_series_work(httpTask,nullptr); */httpTask->start();waitGroup.wait();cout << "finish!\n";
}

03 阅读下面的代码并尝试添加注释

#include <workflow/WFFacilities.h>
#include <workflow/MySQLUtil.h>
#include <workflow/MySQLResult.h>
#include <iostream>
#include <signal.h>
using std::string;
using std::cerr;
static WFFacilities::WaitGroup waitGroup(1);
void sighandler(int signum){std::cout << "signum = " << signum << "\n";waitGroup.done();
}
void mysqlCallback(WFMySQLTask * mysqlTask){if(mysqlTask->get_state() != WFT_STATE_SUCCESS){// 在系统层面报错,权限or密码cerr << "error_msg =  " << WFGlobal::get_error_string(mysqlTask->get_state(), mysqlTask->get_error()) << "\n";return;}protocol::MySQLResponse * resp = mysqlTask->get_resp();if(resp->get_packet_type() == MYSQL_PACKET_ERROR){// 在SQL语句报错cerr << "error_code = " << resp->get_error_code() << " error_msg = " << resp->get_error_msg() << "\n";return;}protocol::MySQLResultCursor cursor(resp);do{if(cursor.get_cursor_status() == MYSQL_STATUS_OK){// 写类型的SQL语句cerr << "write \n";cerr << cursor.get_affected_rows() << " rows affected\n";}else if(cursor.get_cursor_status() == MYSQL_STATUS_GET_RESULT){// 读类型的SQL语句cerr << "read \n";// 读表头 列的信息 fieldconst protocol::MySQLField * const * fieldArr;fieldArr = cursor.fetch_fields();for(int i = 0; i < cursor.get_field_count(); ++i){cerr << "db = " << fieldArr[i]->get_db()<< " table = " << fieldArr[i]->get_table()<< " name = " << fieldArr[i]->get_name()<< " type = " << datatype2str(fieldArr[i]->get_data_type()) << "\n";}// 读表的内容 每一行每一列// bool fetch_all(std::vector<std::vector<MySQLCell>>& rows);std::vector<std::vector<protocol::MySQLCell>> rows;cursor.fetch_all(rows);for(auto &row:rows){for(auto &cell:row){if(cell.is_int()){cerr << cell.as_int();}else if(cell.is_string()){cerr << cell.as_string();}else if(cell.is_datetime()){cerr << cell.as_datetime();}cerr << "\t";}cerr << "\n";} }}while(cursor.next_result_set()); //mysql 任务支持一个任务处理多个SQL语句
}
int main(){signal(SIGINT,sighandler);WFMySQLTask * mysqlTask =  WFTaskFactory::create_mysql_task("mysql://root:123@localhost",1,mysqlCallback);string sql = "insert into mycloud.tbl_user_token (user_name,user_token) values ('Caixukun','singdancerap');";//string sql;sql += "select * from mycloud.tbl_user_token;";mysqlTask->get_req()->set_query(sql);mysqlTask->start();waitGroup.wait();return 0;
}
#include <workflow/WFFacilities.h> // 包含 WFFacilities 库
#include <workflow/MySQLUtil.h>    // 包含 MySQL 相关工具库
#include <workflow/MySQLResult.h>  // 包含 MySQL 结果处理库
#include <iostream>                // 包含输入输出流库
#include <signal.h>                // 包含信号处理库using std::string; // 使用 string 类型
using std::cerr;   // 使用 cerr 输出错误信息// 创建一个 WaitGroup 对象,用于同步
static WFFacilities::WaitGroup waitGroup(1);// 定义信号处理函数,用于处理 SIGINT 信号
void sighandler(int signum){std::cout << "signum = " << signum << "\n"; // 输出接收到的信号编号waitGroup.done(); // 完成 WaitGroup 的工作,结束程序
}// MySQL 任务的回调函数
void mysqlCallback(WFMySQLTask * mysqlTask){// 检查任务状态是否成功if(mysqlTask->get_state() != WFT_STATE_SUCCESS){// 输出系统级错误信息,如权限或密码错误cerr << "error_msg =  " << WFGlobal::get_error_string(mysqlTask->get_state(), mysqlTask->get_error()) << "\n";return;}// 获取 MySQL 响应对象protocol::MySQLResponse * resp = mysqlTask->get_resp();// 检查返回的数据包类型是否为错误类型if(resp->get_packet_type() == MYSQL_PACKET_ERROR){// 输出 SQL 语句执行中的错误码和错误信息cerr << "error_code = " << resp->get_error_code() << " error_msg = " << resp->get_error_msg() << "\n";return;}// 创建 MySQL 结果游标对象protocol::MySQLResultCursor cursor(resp);// 循环处理结果集do{// 检查游标状态if(cursor.get_cursor_status() == MYSQL_STATUS_OK){// 如果是写操作,输出受影响的行数cerr << "write \n";cerr << cursor.get_affected_rows() << " rows affected\n";}else if(cursor.get_cursor_status() == MYSQL_STATUS_GET_RESULT){// 如果是读操作cerr << "read \n";// 读取字段信息const protocol::MySQLField * const * fieldArr;fieldArr = cursor.fetch_fields();for(int i = 0; i < cursor.get_field_count(); ++i){// 输出数据库、表名、字段名及其数据类型cerr << "db = " << fieldArr[i]->get_db()<< " table = " << fieldArr[i]->get_table()<< " name = " << fieldArr[i]->get_name()<< " type = " << datatype2str(fieldArr[i]->get_data_type()) << "\n";}// 读取表的所有内容std::vector<std::vector<protocol::MySQLCell>> rows;cursor.fetch_all(rows); // 获取所有行// 遍历每一行和每一个单元格for(auto &row : rows){for(auto &cell : row){// 根据单元格的数据类型输出相应的内容if(cell.is_int()){cerr << cell.as_int();}else if(cell.is_string()){cerr << cell.as_string();}else if(cell.is_datetime()){cerr << cell.as_datetime();}cerr << "\t"; // 输出制表符}cerr << "\n"; // 换行} }} while(cursor.next_result_set()); // 支持一个任务处理多个 SQL 语句
}int main(){signal(SIGINT, sighandler); // 注册 SIGINT 信号处理函数// 创建 MySQL 任务并指定连接字符串和回调函数WFMySQLTask * mysqlTask =  WFTaskFactory::create_mysql_task("mysql://root:123@localhost", 1, mysqlCallback);// 构造 SQL 查询语句string sql = "insert into mycloud.tbl_user_token (user_name, user_token) values ('Caixukun', 'singdancerap');";sql += "select * from mycloud.tbl_user_token;"; // 在插入后选择用户令牌// 设置 MySQL 任务的查询mysqlTask->get_req()->set_query(sql);// 启动 MySQL 任务mysqlTask->start();// 等待 WaitGroup 完成waitGroup.wait();return 0; // 返回 0 表示程序正常结束
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1556132.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

系统设计,如何设计一个秒杀功能

需要解决的问题 瞬时流量的承接防止超卖预防黑产避免对正常服务的影响兜底方法 前端设计 利用 CDN 缓存静态资源&#xff0c;减轻服务器的压力在前端随机限流按钮防抖&#xff0c;防止用户重复点击 后端设计 Nginx 做统一接入&#xff0c;进行负载均衡与限流用 sentinel 等…

工具 | 红队大佬亲测5款推荐的Burpsuite插件

*免责声明&#xff1a;* *本文章仅用于信息安全技术分享&#xff0c;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损失&#xff0c;均由使用者本人负责&#xff0c;所产生的一切不良后果与文章作…

【LeetCode-热题100-128题】官方题解好像有误

最长连续序列 题目链接&#xff1a;https://leetcode.cn/problems/longest-consecutive-sequence/?envTypestudy-plan-v2&envIdtop-100-liked 给定一个未排序的整数数组 nums &#xff0c;找出数字连续的最长序列&#xff08;不要求序列元素在原数组中连续&#xff09;的…

LLM大模型学习精要系列(一):掌握基础,开启大模型之旅

1.前言 1.1 基础模型研究 2023 年&#xff0c;随着 LLM 技术的发展&#xff0c;中国模型研究机构的开源模型迎来了爆发式的增长&#xff1a; 2023 年 3 月&#xff0c;智谱 AI 首先在魔搭社区发布了 ChatGLM-6B 系列&#xff0c;ChatGLM-6B 是一个开源的、支持中英双语问答的…

如何只修改obsidian图片链接为markdown

如何只修改obsidian图片链接为markdown 前言插件配置 使用注意 前言 适合有一定了解obsidian用法和插件市场&#xff0c;还有相对路径的人 插件 在obsidian插件市场搜索—开梯子 配置 首先使用ctrlp打开命令面板&#xff0c;也可以在左侧通过图标打开命令面板&#xff0c…

车载电子电气架构--- 车载诊断DTC全覆盖分类

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 屏蔽力是信息过载时代一个人的特殊竞争力,任何消耗你的人和事,多看一眼都是你的不对。非必要不费力证明自己,无利益不试图说服别人,是精神上的节…

智能制造的人机料法环的内涵

在生产和管理领域,有个很重要的概念叫 “人、机、料、法、环”。 “人” 就是参与其中的人员,他们的技能、态度、责任心等对事情的结果影响很大; “机” 指的是机器设备和工具等,就像干活要用的家伙事儿,好不好用、正不正常直接关系到工作的效率和质量; “料” 呢,就…

MathType软件7.9最新版本下载超实用指南

嗨&#xff0c;各位学霸、研究僧还有教育界的大佬们&#xff01;&#x1f44b; 今天我要给你们安利的不是一道数学题&#xff0c;也不是一本教科书&#xff0c;而是一款让你分分钟爱上数学的神器——MathType软件&#xff01;&#x1f389; #### &#x1f4da; **MathType是什…

DataX+Crontab实现多任务顺序定时同步

DataX+Crontab实现多任务顺序定时同步 前言 DataX 是一款支持在异构数据源之间离线同步数据的工具, DataX 通过输入一些命令执行 json 配置文件,这样使用起来并不是很方便, DataX 也不支持定时任务调度,它仅支持一次性同步任务。所以 DataX 的这些特点造成了它无法完成一些…

S7-200 SMART Modbus RTU常见问题

1.S7-200 SMART 是否支持 Modbus ASCII 通信模式&#xff1f; STEP 7-Micro/WIN SMART 软件未提供Modbus ASCII 通信模式指令库。S7-200 SMART CPU若用于Modbus ASCII 通信时&#xff0c;则需要用户使用自由口通信模式进行编程。 2.S7-200 SMART CPU 集成的RS485 端口&#xf…

YOLOv10改进,YOLOv10添加CA注意力机制,二次创新C2f结构,助力涨点

改进前训练结果: 二次创新C2f结构训练结果: 摘要 在本文中,提出了一种新的移动网络注意力机制,将位置信息嵌入到信道注意力中称之为“协调注意力”。与渠道关注不同通过 2D 全局池将特征张量转换为单个特征向量,坐标注意力因子将通道注意力转化为两个 1D 特征编码过程…

STAR数据集:首个用于大型卫星图像中场景图生成大规模数据集

2024-06-12&#xff0c;在遥感图像领域&#xff0c;由武汉大学等机构联合创建的STAR数据集&#xff0c;标志着场景图生成技术在大规模、高分辨率卫星图像中的新突破。 一、研究背景&#xff1a; 场景图生成(Scene Graph Generation, SGG)技术在自然图像中已取得显著进展&#…

[C语言]指针和数组

目录 1.数组的地址 2.通过指针访问数组 3.数组和指针的不同点 4.指针数组 1.数组的地址 数组的地址是什么&#xff1f; 看下面一组代码 #include <stdio.h> int main() { int arr[5] {5,4,3,2,1}; printf("&arr[0] %p\n", &arr[0]); printf(&qu…

个人网站,怎么操作才能提升个人网站的流量

运营个人网站以提升流量是一个综合性的过程&#xff0c;涉及内容优化、技术调整、用户体验提升以及外部推广等多个方面。以下是一些专业建议&#xff0c;旨在帮助个人网站运营者有效提升网站流量&#xff1a; 1.精准关键词研究与优化 -关键词研究&#xff1a;利用工具如谷歌…

【YOLO学习】YOLOv3详解

文章目录 1. 网络结构1.1 结构介绍1.2 改进 2. 训练与测试过程3. 总结 1. 网络结构 1.1 结构介绍 1. 与 YOLOv2 不同的是&#xff0c;YOLOv3 在 Darknet-19 里加入了 ResNet 残差连接&#xff0c;改进之后的模型叫 Darknet-53。在 ImageNet上 实验发现 Darknet-53 相对于 ResN…

数据结构与算法篇(树 - 常见术语)

目录 一、什么是树&#xff1f; 二、相关术语 根结点 边 叶子结点 兄弟结点 祖先结点 结点的大小 树的层 结点的深度 结点的高度 树的高度 斜树 一、什么是树&#xff1f; 树是一种类似于链表的数据结构&#xff0c;不过链表的结点是以线性方式简单地指向其后继结…

ORB-SLAM复现时遇到的问题(复现失败,切莫当做教程)

背景 本人的环境&#xff1a;使用ubuntu20.04&#xff0c;opencv4 问题 进行Build DBoW2. Go into Thirdparty/DBoW2/ and execute:时&#xff0c;运行make时出错 我安装的opencv4&#xff0c;在 OpenCV 3 和更高版本中&#xff0c;头文件的路径可能已更改。例如&#xff0…

[单master节点k8s部署]32.ceph分布式存储(三)

基于ceph rbd生成pv 在集群中认证ceph 用下面代码生成ceph的secret .创建 ceph 的 secret&#xff0c;在 k8s 的控制节点操作&#xff1a; 回到 ceph 管理节点创建 pool 池&#xff1a; [rootmaster1-admin ~]# ceph osd pool create k8stest 56 pool k8stest created [rootm…

免费音频剪辑软件大揭秘:让声音创作更轻松

在精神娱乐越发丰富的现在&#xff0c;音频内容的创作和编辑变得越来越重要。无论是专业的音乐制作人&#xff0c;还是自媒体创作者&#xff0c;都可能需要一款功能强大且易于使用的音频剪辑软件来处理音频素材。今天我们一同来探讨有什么好用的免费音频剪辑软件吧。 1.福昕音…

golang gin入门

gin是个小而精的web开发框架 官方文档 安装 go get -u github.com/gin-gonic/gin最简单的起手代码 package mainimport ("net/http""github.com/gin-gonic/gin" )func main() {r : gin.Default()r.GET("/ping", func(c *gin.Context) {c.JSON…