Redis 使用 epoll 封装 Reactor 模型

2024-4-27|2024-4-27
麦兜
麦兜
type
Post
status
Published
date
Apr 27, 2024
slug
summary
tags
数据库
Redis
category
学习思考
password
icon

epoll

epoll 是 I/O 事件通知模型。epoll在监听事件的时候需要两个维护两个epoll_event记录集,1是用于需要监听的事件的 ,2是用于记录已就绪的监听事件。
//epoll 的结构体 epoll_event typedef union epoll_data { ... int fd; //记录文件描述符 ... } epoll_data_t; struct epoll_event { uint32_t events; //epoll监听的事件类型 (EPOLLIN、EPOLLOUT、EPOLLERR) epoll_data_t data; //应用程序数据 };

相关API

epoll_create : 创建 epoll 实例
int epoll_create(int size); //返回的是ep的fd,参数size只是参考值会自动扩容。
epoll_wait : 等待就绪的描述符,timeout=0立即返回,timeout=1阻塞式等待。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
epoll_ctl: 将描述符改增删操作,op EPOLL_CTL_ADD, EPOLL_CTL_MOD, EPOLL_CTL_DEL。
int epoll_ctl(int epfd, int op, int fd,struct epoll_event *_Nullableevent);

epoll 简单使用流程

int sock_fd,conn_fd; sock_fd = socket() bind(sock_fd) listen(sock_fd) //创建epoll实例 epfd = epoll_create(EPOLL_SIZE); //创建epoll_event结构体数组,保存套接字对应文件描述符和监听事件类型 ep_events = (epoll_event*)malloc(sizeof(epoll_event) * EPOLL_SIZE); //创建epoll_event变量 struct epoll_event ee //监听读事件 ee.events = EPOLLIN; //监听的文件描述符是刚创建的 listener 的 socket ee.data.fd = sock_fd; //将 listener的 socket 加入到监听列表中 epoll_ctl(epfd, EPOLL_CTL_ADD, sock_fd, &ee); while (1) { //等待返回已经就绪的描述符 n = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1); //遍历所有就绪的描述符 for (int i = 0; i < n; i++) { //如果是 listener 接字描述符就绪,表明有一个新客户端连接到来 if (ep_events[i].data.fd == sock_fd) { conn_fd = accept(sock_fd); //调用accept()建立连接 ee.events = EPOLLIN; ee.data.fd = conn_fd; //添加对新创建的已连接套接字描述符的监听,监听后续在已连接套接字上的读事件 epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &ee); } else { //如果是已连接套接字描述符就绪,则可以读数据 ...//读取数据并处理 } } }

Redis 的 Reactor 模型

Reactor 的介绍:Reactor 是一个异步的编程模型。
 
Redis 的 Reactor 封装相关代码在 ae.c ae.h,在启动服务时候会创建一个事件循环体,调用aeEventLoop *aeCreateEventLoop(int setsize)方法 setzise server.maxclients+CONFIG_FDSET_INCR 客户端最大连接数(默认10000)+ 额外配置(128)。

初始化 Event Loop

aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; //用于记录回调函数指针 eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); //用于记录fd的状态 eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); ... //真正初始化 调用具体实现的api,可能是select 、 eopll。 if (aeApiCreate(eventLoop) == -1) goto err; ... }
typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ aeFileProc *rfileProc; //具体的读事件回调函数指针 aeFileProc *wfileProc; //具体的写事件回调函数指针 void *clientData; } aeFileEvent;
//epoll 封装状态体 typedef struct aeApiState { int epfd; struct epoll_event *events; //就绪的响应集 } aeApiState; //eopll 的 aeApiCreate api 实现。 static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); .... state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); //调用 epoll_create api创建 epoll 的 fd state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ ... //epoll fd eventLoop->apidata = state; }

监听事件并绑定回调函数

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { ... aeFileEvent *fe = &eventLoop->events[fd]; //添加监听事件 调用具体实现的api,可能是select 、 eopll。 if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; //记录回调函数指针 if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; ... return AE_OK; }
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; //根据op判断是新增还是修改,根据mask决定是读事件还是写事件 if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0; }

等待回调

void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { //aeProcessEvents 会根据状态选择性处理那些事件。 //调用 aeApiPoll 等待监听的事件就绪 aeProcessEvents(eventLoop, AE_ALL_EVENTS| AE_CALL_BEFORE_SLEEP| AE_CALL_AFTER_SLEEP); } }
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { ... //等待就绪的事件 numevents = aeApiPoll(eventLoop, tvp); if (!(flags & AE_FILE_EVENTS)) { numevents = 0; } ... //根据状态执行根据的回调函数 for (j = 0; j < numevents; j++) { int fd = eventLoop->fired[j].fd; aeFileEvent *fe = &eventLoop->events[fd]; int mask = eventLoop->fired[j].mask; int fired = 0; /* Number of events fired for current fd. */ if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ } /* Fire the writable event. */ if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } if (invert) { fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } ... return processed; }
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; //eopll的实现 等待监听就绪 retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1); if (retval > 0) { int j; numevents = retval; //遍历就绪的事件,并标记状态 for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } else if (retval == -1 && errno != EINTR) { panic("aeApiPoll: epoll_wait, %s", strerror(errno)); } return numevents; }

Redis 监听客户端

server.cmain() 方法中初始化一系列操作后会调用 server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); 创建一个循环体,然后执行 initListeners()方法,initListeners 会根据用户配置的端口数量监听 fd,调用 createSocketAcceptHandler()方法将 listener 的 fd 与回调函数指针绑定,如果连接的是TCP/IP Socket 回调函数就是 connSocketAcceptHandler,如果是 Unix Socket 的话回调函数是 connUnixAcceptHandler,当监听到 listener 的 fd 可读,说明有客户端请求连接,就回调执行所对应的函数处理。创建好客户端连接后继续把 client 的 fd 添加到 epoll 监听,大致流程是这样
connSocketAcceptHandler()acceptCommonHandler()createClient()
connSetReadHandler(conn,readQueryFromClient)aeCreateFileEvent(server.el,conn->fd,AE_READABLE,conn->type->ae_handler,conn)
readQueryFromClient() 方法就是 client 的 fd 可读的就绪的回调处理函数,接下来Redis 会解析命令、查询数据库、并把结果响应回客户端。

总结

可以看到 Redis 封装的 Reactor 编程模型,事实上是在 epoll 的 api 基础上加上一些功能,比如 在 aeFileEvent 记录了相对应的 fd 要执行的函数指针,等待有就绪的fd就执行回调方法。 aeMain () 基本上是单个线程一直循环,有事件就去处理,eopll 可以返回多个 fd 的事件就绪状态而不用阻塞式的等待某个 fd 事件就绪,这让 Redis 使用单线程也可以处理很多请求连接。
Kubernetes Service参数配置不当引起OOM