C++中的Reactor原理与实现

C++中的Reactor原理与实现

目录

一、Reactor介绍

二、代码实现

一、Reactor介绍

reactor设计模式是event-driven architecture的一种实现方式,处理多个客户端并发的向服务端请求服务的场景。每种服务在服务端可能由多个方法组成。reactor会解耦并发请求的服务并分发给对应的事件处理器来处理。

中心思想是将所有要处理的I/o事件注册到一个中心I/o多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有I/o事件到来或是准备就绪(文件描述符或socket可读、写),多路复用器返回并将事先注册的相应l/o事件分发到对应的处理器中。

处理机制为:主程序将事件以及对应事件处理的方法在Reactor上进行注册, 如果相应的事件发生,Reactor将会主动调用事件注册的接口,即 回调函数.

二、代码实现

前提准备:1单例模式:单例模式(Singleton Pattern,也称为单件模式),使用最广泛的设计模式之一。其意图是保证一个类(结构体)仅有一个实例,并提供一个访问它的全局访问点,该实例被所有程序模块共享。
2.回调函数:把一段可执行的代码像参数传递那样传给其他代码,而这段代码会在某个时刻被调用执行,这就叫做回调。

对epoll反应堆中结构体定义

/*fd包含的属性*/ struct nitem { // fd int fd;//要监听的文件描述符 int status;//是否在监听:1->在红黑树上(监听),0->不在(不监听) int events;//对应的监听事件,EPOLLIN和EPOLLOUT(不同的事件,走不同的回调函数) void *arg;//指向自己结构体指针 #if 0 NCALLBACK callback; #else NCALLBACK *readcb; // epollin NCALLBACK *writecb; // epollout NCALLBACK *acceptcb; // epollin #endif unsigned char sbuffer[BUFFER_LENGTH]; // int slength; unsigned char rbuffer[BUFFER_LENGTH]; int rlength; }; /*分块存储*/ struct itemblock { struct itemblock *next; struct nitem *items; }; /*epoll反应堆中包括通信的fd以及epoll的(epfd)*/ struct reactor { int epfd; struct itemblock *head; };

单例模式,创建reactor的一个实例

/*单例模式*/ struct reactor *instance = NULL; int init_reactor(struct reactor *r) { if (r == NULL) return -1; int epfd = epoll_create(1); //int size r->epfd = epfd; // fd --> item r->head = (struct itemblock*)malloc(sizeof(struct itemblock)); if (r->head == NULL) { close(epfd); return -2; } memset(r->head, 0, sizeof(struct itemblock)); r->head->items = (struct nitem *)malloc(MAX_EPOLL_EVENT * sizeof(struct nitem)); if (r->head->items == NULL) { free(r->head); close(epfd); return -2; } memset(r->head->items, 0, (MAX_EPOLL_EVENT * sizeof(struct nitem))); r->head->next = NULL; return 0; } struct reactor *getInstance(void) { //singleton if (instance == NULL) { instance = (struct reactor *)malloc(sizeof(struct reactor)); if (instance == NULL) return NULL; memset(instance, 0, sizeof(struct reactor)); if (0 > init_reactor(instance)) { free(instance); return NULL; } } return instance; }

事件注册

/*nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL);*/ /*nreactor_set_event(fd, read_callback, READ_CB, NULL);*/ /*fd找到对应事件*/ /*驱动注册*/ int nreactor_set_event(int fd, NCALLBACK cb, int event, void *arg) { struct reactor *r = getInstance(); struct epoll_event ev = {0}; //1 if (event == READ_CB) { r->head->items[fd].fd = fd; r->head->items[fd].readcb = cb; r->head->items[fd].arg = arg; ev.events = EPOLLIN; } //2 else if (event == WRITE_CB) { r->head->items[fd].fd = fd; r->head->items[fd].writecb = cb; r->head->items[fd].arg = arg; ev.events = EPOLLOUT; } //3 else if (event == ACCEPT_CB) { r->head->items[fd].fd = fd; r->head->items[fd].acceptcb = cb;//回调函数 r->head->items[fd].arg = arg; ev.events = EPOLLIN; } ev.data.ptr = &r->head->items[fd]; /*NOSET_CB 0*/ if (r->head->items[fd].events == NOSET_CB) { if (epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev) < 0) { printf("epoll_ctl EPOLL_CTL_ADD failed, %d\n", errno); return -1; } r->head->items[fd].events = event; } else if (r->head->items[fd].events != event) { if (epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev) < 0) { printf("epoll_ctl EPOLL_CTL_MOD failed\n"); return -1; } r->head->items[fd].events = event; } return 0; }

回调函数书写

int write_callback(int fd, int event, void *arg) { struct reactor *R = getInstance(); unsigned char *sbuffer = R->head->items[fd].sbuffer; int length = R->head->items[fd].slength; int ret = send(fd, sbuffer, length, 0); if (ret < length) { nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } else { nreactor_set_event(fd, read_callback, READ_CB, NULL); } return 0; } // 5k qps int read_callback(int fd, int event, void *arg) { struct reactor *R = getInstance(); unsigned char *buffer = R->head->items[fd].rbuffer; #if 0 //ET int idx = 0, ret = 0; while (idx < BUFFER_LENGTH) { ret = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0); if (ret == -1) { break; } else if (ret > 0) { idx += ret; } else {// == 0 break; } } if (idx == BUFFER_LENGTH && ret != -1) { nreactor_set_event(fd, read_callback, READ_CB, NULL); } else if (ret == 0) { nreactor_set_event //close(fd); } else { nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } #else //LT int ret = recv(fd, buffer, BUFFER_LENGTH, 0); if (ret == 0) { // fin nreactor_del_event(fd, NULL, 0, NULL); close(fd); } else if (ret > 0) { unsigned char *sbuffer = R->head->items[fd].sbuffer; memcpy(sbuffer, buffer, ret); R->head->items[fd].slength = ret; printf("readcb: %s\n", sbuffer); nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } #endif } // web server // ET / LT int accept_callback(int fd, int event, void *arg) { int connfd; struct sockaddr_in client; socklen_t len = sizeof(client); if ((connfd = accept(fd, (struct sockaddr *)&client, &len)) == -1) { printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } nreactor_set_event(connfd, read_callback, READ_CB, NULL); }

监听描述符变化

// accept --> EPOLL /*epoll_wait监听0*/ int reactor_loop(int listenfd) { struct reactor *R = getInstance(); struct epoll_event events[POLL_SIZE] = {0}; while (1) { int nready = epoll_wait(R->epfd, events, POLL_SIZE, -1); if (nready == -1) { continue; } int i = 0; for (i = 0;i < nready;i ++) { struct nitem *item = (struct nitem *)events[i].data.ptr; int connfd = item->fd; if (connfd == listenfd) { // item->acceptcb(listenfd, 0, NULL); } else { if (events[i].events & EPOLLIN) { // item->readcb(connfd, 0, NULL); } if (events[i].events & EPOLLOUT) { item->writecb(connfd, 0, NULL); } } } } return 0; }

完整代码实现

#define MAXLNE 4096 #define POLL_SIZE1024 #define BUFFER_LENGTH1024 #define MAX_EPOLL_EVENT1024 #define NOSET_CB0 #define READ_CB1 #define WRITE_CB2 #define ACCEPT_CB3 /*单例模式*/ typedef int NCALLBACK(int fd, int event, void *arg); /*fd包含的属性*/ struct nitem { // fd int fd;//要监听的文件描述符 int status;//是否在监听:1->在红黑树上(监听),0->不在(不监听) int events;//对应的监听事件,EPOLLIN和EPOLLOUT(不同的事件,走不同的回调函数) void *arg;//指向自己结构体指针 #if 0 NCALLBACK callback; #else NCALLBACK *readcb; // epollin NCALLBACK *writecb; // epollout NCALLBACK *acceptcb; // epollin #endif unsigned char sbuffer[BUFFER_LENGTH]; // int slength; unsigned char rbuffer[BUFFER_LENGTH]; int rlength; }; /*分块存储*/ struct itemblock { struct itemblock *next; struct nitem *items; }; /*epoll反应堆中包括通信的fd以及epoll的(epfd)*/ struct reactor { int epfd; struct itemblock *head; }; /*初始化结构体*/ int init_reactor(struct reactor *r); int read_callback(int fd, int event, void *arg); int write_callback(int fd, int event, void *arg); int accept_callback(int fd, int event, void *arg); /*单例模式*/ struct reactor *instance = NULL; struct reactor *getInstance(void) { //singleton if (instance == NULL) { instance = (struct reactor *)malloc(sizeof(struct reactor)); if (instance == NULL) return NULL; memset(instance, 0, sizeof(struct reactor)); if (0 > init_reactor(instance)) { free(instance); return NULL; } } return instance; } /*nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL);*/ /*nreactor_set_event(fd, read_callback, READ_CB, NULL);*/ /*fd找到对应事件*/ /*驱动注册*/ int nreactor_set_event(int fd, NCALLBACK cb, int event, void *arg) { struct reactor *r = getInstance(); struct epoll_event ev = {0}; //1 if (event == READ_CB) { r->head->items[fd].fd = fd; r->head->items[fd].readcb = cb; r->head->items[fd].arg = arg; ev.events = EPOLLIN; } //2 else if (event == WRITE_CB) { r->head->items[fd].fd = fd; r->head->items[fd].writecb = cb; r->head->items[fd].arg = arg; ev.events = EPOLLOUT; } //3 else if (event == ACCEPT_CB) { r->head->items[fd].fd = fd; r->head->items[fd].acceptcb = cb;//回调函数 r->head->items[fd].arg = arg; ev.events = EPOLLIN; } ev.data.ptr = &r->head->items[fd]; /*NOSET_CB 0*/ if (r->head->items[fd].events == NOSET_CB) { if (epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev) < 0) { printf("epoll_ctl EPOLL_CTL_ADD failed, %d\n", errno); return -1; } r->head->items[fd].events = event; } else if (r->head->items[fd].events != event) { if (epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev) < 0) { printf("epoll_ctl EPOLL_CTL_MOD failed\n"); return -1; } r->head->items[fd].events = event; } return 0; } /*nreactor_del_event(fd, NULL, 0, NULL);*/ /*下树*/ /*nreactor_del_event(fd, NULL, 0, NULL);*/ int nreactor_del_event(int fd, NCALLBACK cb, int event, void *arg) { struct reactor *r = getInstance(); struct epoll_event ev = {0}; ev.data.ptr = arg; epoll_ctl(r->epfd, EPOLL_CTL_DEL, fd, &ev); r->head->items[fd].events = 0; return 0; } int write_callback(int fd, int event, void *arg) { struct reactor *R = getInstance(); unsigned char *sbuffer = R->head->items[fd].sbuffer; int length = R->head->items[fd].slength; int ret = send(fd, sbuffer, length, 0); if (ret < length) { nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } else { nreactor_set_event(fd, read_callback, READ_CB, NULL); } return 0; } // 5k qps int read_callback(int fd, int event, void *arg) { struct reactor *R = getInstance(); unsigned char *buffer = R->head->items[fd].rbuffer; #if 0 //ET int idx = 0, ret = 0; while (idx < BUFFER_LENGTH) { ret = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0); if (ret == -1) { break; } else if (ret > 0) { idx += ret; } else {// == 0 break; } } if (idx == BUFFER_LENGTH && ret != -1) { nreactor_set_event(fd, read_callback, READ_CB, NULL); } else if (ret == 0) { nreactor_set_event //close(fd); } else { nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } #else //LT int ret = recv(fd, buffer, BUFFER_LENGTH, 0); if (ret == 0) { // fin nreactor_del_event(fd, NULL, 0, NULL); close(fd); } else if (ret > 0) { unsigned char *sbuffer = R->head->items[fd].sbuffer; memcpy(sbuffer, buffer, ret); R->head->items[fd].slength = ret; printf("readcb: %s\n", sbuffer); nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } #endif } // web server // ET / LT int accept_callback(int fd, int event, void *arg) { int connfd; struct sockaddr_in client; socklen_t len = sizeof(client); if ((connfd = accept(fd, (struct sockaddr *)&client, &len)) == -1) { printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } nreactor_set_event(connfd, read_callback, READ_CB, NULL); } int init_server(int port) { int listenfd; struct sockaddr_in servaddr; char buff[MAXLNE]; if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { printf("create socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(port); if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) { printf("bind socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } if (listen(listenfd, 10) == -1) { printf("listen socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } return listenfd; } int init_reactor(struct reactor *r) { if (r == NULL) return -1; int epfd = epoll_create(1); //int size r->epfd = epfd; // fd --> item r->head = (struct itemblock*)malloc(sizeof(struct itemblock)); if (r->head == NULL) { close(epfd); return -2; } memset(r->head, 0, sizeof(struct itemblock)); r->head->items = (struct nitem *)malloc(MAX_EPOLL_EVENT * sizeof(struct nitem)); if (r->head->items == NULL) { free(r->head); close(epfd); return -2; } memset(r->head->items, 0, (MAX_EPOLL_EVENT * sizeof(struct nitem))); r->head->next = NULL; return 0; } // accept --> EPOLL /*epoll_wait监听0*/ int reactor_loop(int listenfd) { struct reactor *R = getInstance(); struct epoll_event events[POLL_SIZE] = {0}; while (1) { int nready = epoll_wait(R->epfd, events, POLL_SIZE, -1); if (nready == -1) { continue; } int i = 0; for (i = 0;i < nready;i ++) { struct nitem *item = (struct nitem *)events[i].data.ptr; int connfd = item->fd; if (connfd == listenfd) { // item->acceptcb(listenfd, 0, NULL); } else { if (events[i].events & EPOLLIN) { // item->readcb(connfd, 0, NULL); } if (events[i].events & EPOLLOUT) { item->writecb(connfd, 0, NULL); } } } } return 0; } int main(int argc, char **argv) { int connfd, n; int listenfd = init_server(9999); nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL); //nreactor_set_event(listenfd, accept_callback, read_callback, write_callback); reactor_loop(listenfd); return 0; }

到此这篇关于Reactor原理与实现的文章就介绍到这了,更多相关Reactor原理内容请搜索易知道(ezd.cc)以前的文章或继续浏览下面的相关文章希望大家以后多多支持易知道(ezd.cc)!

推荐阅读