小编给大家分享一下redis多路复用技术的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
redis 是一个单线程却性能非常好的内存数据库, 主要用来作为缓存系统。 redis 采用网络IO多路复用技术来保证在多连接的时候, 系统的高吞吐量。
redis的多路复用, 提供了select, epoll, evport, kqueue几种选择,在编译的时候来选择一种。
我们一般运行的服务器都是LINUX系统上面, 并且我对Solaris和Mac系统不是很了解, 我们这里重点比较一下select、poll和epoll 3种多路复用的差异。
select: 单个进程所能打开的最大连接数有FD_SETSIZE宏定义, 其大小为1024或者2048; FD数目剧增后, 会带来性能问题;消息传递从内核到与到用户空间,需要copy数据;
性能问题:
(1)每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
(2)同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
poll: 基本上与select一样, 不通点在于没有FD数目的限制, 因为底层实现不是一个数组, 而是链表;
select 在LINUX的接口:
#include <sys/select.h>/* According to earlier standards */#include <sys/time.h>#include <sys/types.h>#include <unistd.h>int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);void FD_CLR(int fd, fd_set *set);int FD_ISSET(int fd, fd_set *set);void FD_SET(int fd, fd_set *set);void FD_ZERO(fd_set *set);
select 函数的参数:
- nfds:fd_set的FD的个数, 采用位图的方式记录fd_set集合的FD状态;
- readfds: fd_set 集合中来监控有哪些读操作没有被block, 如果有可读,select
- writefds:fd_set 集合中来监控有哪些写操作没有被block;
- exceptfds: fd_set 集合中来监控有哪些except操作没有被block;
- timeout: FD z最小被block的时间, 如果timeout的2个field都是0, 会立刻返回, 如果该参数是NULL, 会一直block;
select如果有一个或者多个读操作, 写操作, except操作不被block, 返回大于1的数值; 若果没有不被block的FD, 但是某些FD block超时, 返回0; 如果错误出现, 返回-1;
FD_XXX函数, 是添加、删除、清空以及判断fd_set的工具函数。
select的pseudo 代码:
while (1){ int ret = select(streams[]); if (ret > 0 ) { for i in streams[] { if i has data { read or write streams[i]; } } } else if (ret == 0) { handle timeout FDs; }else { handle error }}
epoll的LINUX的接口:
#include <sys/epoll.h>//预定义的EVENTenum EPOLL_EVENTS { EPOLLIN = 0x001,#define EPOLLIN EPOLLIN EPOLLPRI = 0x002,#define EPOLLPRI EPOLLPRI EPOLLOUT = 0x004,#define EPOLLOUT EPOLLOUT EPOLLRDNORM = 0x040,#define EPOLLRDNORM EPOLLRDNORM EPOLLRDBAND = 0x080,#define EPOLLRDBAND EPOLLRDBAND EPOLLWRNORM = 0x100,#define EPOLLWRNORM EPOLLWRNORM EPOLLWRBAND = 0x200,#define EPOLLWRBAND EPOLLWRBAND EPOLLMSG = 0x400,#define EPOLLMSG EPOLLMSG EPOLLERR = 0x008,#define EPOLLERR EPOLLERR EPOLLHUP = 0x010,#define EPOLLHUP EPOLLHUP EPOLLRDHUP = 0x2000,#define EPOLLRDHUP EPOLLRDHUP EPOLLWAKEUP = 1u << 29,#define EPOLLWAKEUP EPOLLWAKEUP EPOLLONESHOT = 1u << 30,#define EPOLLONESHOT EPOLLONESHOT EPOLLET = 1u << 31#define EPOLLET EPOLLET };int epoll_create(int size);//创建epoll对象并回传其描述符。int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);//将要交由内核管控的文件描述符加入epoll对象并设置触发条件。int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);//等待已注册之事件被触发或计时终了。
epoll提供edge-triggered及level-triggered模式。在edge-trigger模式中,epoll_wait仅会在新的事件首次被加入epoll 对象时返回;于level-triggered模式下,epoll_wait在事件状态未变更前将不断被触发。
举例来说,倘若有一个已经于epoll注册之管线接获数据,epoll_wait将返回,并发出数据读取的信号。现假设缓冲器的数据仅有部分被读取并处理,在level-triggered模式下,任何对epoll_wait之调用都将即刻返回,直到缓冲器中的数据全部被读取;然而,在edge-triggered的情境下,epoll_wait仅会于再次接收到新数据(亦即,新数据被写入管线)时返回。
epoll的实现pseudo 代码
epollfd = epoll_create()while (1) { active_stream[] = epoll_wait(epollfd) for (i=0; i < len(active_stream[]); i++) { read or write active_stream[i] }}
接下来我们看一下, redis的多路复用如何实现的。整个redis的main函数包含如下3部分:
1、初始化Redis Server参数,这部分代码通过initServerConfig实现。
2、初始化Redis Server,这部分代码在initServer里面。
3、启动事件轮询器。
这里第一部分, 就是通过配置文件的参数来初始化server对象的参数, 和本文的主题没有太大关系这里略过。
第二部分, 包含了创建轮询器, 以及一个时间event队列, 和file event数组。
void initServer(void) { ... server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); if (server.el == NULL) { serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'", strerror(errno)); } ... /* Create the timer callback, this is our way to process many background * operations incrementally, like clients timeout, eviction of unaccessed * expired keys and so forth. */ if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic("Can't create event loop timers."); exit(1); } /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.ipfd file event."); } } if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); /* Register a readable event for the pipe used to awake the event loop * when a blocked client in a module needs attention. */ if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE, moduleBlockedClientPipeReadable,NULL) == AE_ERR) { serverPanic( "Error registering the readable event for the module " "blocked clients subsystem."); }
第三部分, 是整个event loop部分:
int main() { // 第一部分 // 第二部分入口 initServer(); ... aeSetBeforeSleepProc(server.el,beforeSleep); aeSetAfterSleepProc(server.el,afterSleep); aeMain(server.el); aeDeleteEventLoop(server.el); return 0;}void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); }} * The function returns the number of events processed. */int aeProcessEvents(aeEventLoop *eventLoop, int flags){ * some event fires. */ numevents = aeApiPoll(eventLoop, tvp); /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ if (fe->mask & mask & AE_READABLE) { rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } } /* Check time events */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */}static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; /* avoid valgrind warning */ int mask = eventLoop->events[fd].mask & (~delmask); ee.events = 0; if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; if (mask != AE_NONE) { epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee); } else { /* Note, Kernel < 2.6.9 requires a non null event pointer even for * EPOLL_CTL_DEL. */ epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee); } }
以上是redis多路复用技术的示例分析的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。