小编给大家分享一下如何实现ceph SimpleMessenger模块消息的接收,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
OSD服务端消息的接收起始于OSD::init()中的messenger::add_dispatcher_head(Dispatcher *d)函数
|- 358 void add_dispatcher_head(Dispatcher *d) {
|| 359 bool first = dispatchers.empty();
|| 360 dispatchers.push_front(d);
|| 361 if (d->ms_can_fast_dispatch_any())
|| 362 fast_dispatchers.push_front(d);
|| 363 if (first)
|| 364 ready(); //如果dispatcher list空,启动SimpleMessenger::ready,不为空证明SimpleMessenger已经启动了
|| 365 }
在SimpleMessenger::ready()中,启动DispatchQueue等待mqueue,如果绑定了端口就启动 accepter接收线程
76 void SimpleMessenger::ready()
- 77 {
| 78 ldout(cct,10) << "ready " << get_myaddr() << dendl;
| 79 dispatch_queue.start(); //启动DispatchQueue,等待mqueue
| 80
| 81 lock.Lock();
| 82 if (did_bind)
| 83 accepter.start();
| 84 lock.Unlock();
| 85 }
Accepter是Thread的继承类,Accepter::start()最终调用Accepter::entry(),在entry中 accept并把接收到的sd加入到Pipe类中
void *Accepter::entry()
{
...
struct pollfd pfd;
pfd.fd = listen_sd;
pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
while (!done) {
int r = poll(&pfd, 1, -1);
if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP))
break;
// accept
entity_addr_t addr;
socklen_t slen = sizeof(addr.ss_addr());
int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen);
if (sd >= 0) {
errors = 0;
ldout(msgr->cct,10) << "accepted incoming on sd " << sd << dendl;
msgr->add_accept_pipe(sd); //注册一个pipe,启动读线程,从该sd中读取数据
} else {
ldout(msgr->cct,0) << "accepter no incoming connection? sd = " << sd
<< " errno " << errno << " " << cpp_strerror(errno) << dendl;
if (++errors > 4)
break;
}
}
...
return 0;
在SimpleMessenger::add_accept_pipe(int sd)中,申请一个Pipe类并把sd加入到Pipe中,开始Pipe::start_reader()
340 Pipe *SimpleMessenger::add_accept_pipe(int sd)
- 341 {
| 342 lock.Lock();
| 343 Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL);
| 344 p->sd = sd;
| 345 p->pipe_lock.Lock();
| 346 p->start_reader();
| 347 p->pipe_lock.Unlock();
| 348 pipes.insert(p);
| 349 accepting_pipes.insert(p);
| 350 lock.Unlock();
| 351 return p;
| 352 }
Pipe类内部有一个Reader和Writer线程类,Pipe::start_reader()启动Pipe::Reader::entry(),最终启动Pipe::reader函数
134 void Pipe::start_reader()
- 135 {
| 136 assert(pipe_lock.is_locked());
| 137 assert(!reader_running);
|- 138 if (reader_needs_join) {
|| 139 reader_thread.join();
|| 140 reader_needs_join = false;
|| 141 }
| 142 reader_running = true;
| 143 reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes);
| 144 }
|- 48 class Reader : public Thread {
|| 49 Pipe *pipe;
|| 50 public:
|| 51 explicit Reader(Pipe *p) : pipe(p) {}
|| 52 void *entry() { pipe->reader(); return 0; }
|| 53 } reader_thread;
在Pipe::reader函数中根据tag接收不同类型的消息,如果是CEPH_MSGR_TAG_MSG类型消息调用read_message接收消息,并把消息加入到mqueue中
void Pipe::reader()
{
pipe_lock.Lock();
if (state == STATE_ACCEPTING) {
accept(); //第一次进入此函数处理
assert(pipe_lock.is_locked());
}
// loop.
while (state != STATE_CLOSED &&
state != STATE_CONNECTING) {
assert(pipe_lock.is_locked());
......
......
else if (tag == CEPH_MSGR_TAG_MSG) {
ldout(msgr->cct,20) << "reader got MSG" << dendl;
Message *m = 0;
int r = read_message(&m, auth_handler.get());
pipe_lock.Lock();
if (!m) {
if (r < 0)
fault(true);
continue;
}
......
......
......
// note last received message.
in_seq = m->get_seq();
cond.Signal(); // wake up writer, to ack this
ldout(msgr->cct,10) << "reader got message "
<< m->get_seq() << " " << m << " " << *m
<< dendl;
in_q->fast_preprocess(m); //mds 、mon不会进入此函数,预处理
if (delay_thread) {
utime_t release;
if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
release = m->get_recv_stamp();
release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
}
delay_thread->queue(release, m);
} else {
if (in_q->can_fast_dispatch(m)) {
reader_dispatching = true;
pipe_lock.Unlock();
in_q->fast_dispatch(m);
pipe_lock.Lock();
reader_dispatching = false;
if (state == STATE_CLOSED ||
notify_on_dispatch_done) { // there might be somebody waiting
notify_on_dispatch_done = false;
cond.Signal();
}
} else { //mds进入此else
in_q->enqueue(m, m->get_priority(), conn_id); //把接收到的messenger加入到mqueue中
}
}
}
......
......
}
// reap?
reader_running = false;
reader_needs_join = true;
unlock_maybe_reap();
ldout(msgr->cct,10) << "reader done" << dendl;
}
在Pipe::DispatchQueue::enqueue函数中加入到mqueue中
void DispatchQueue::enqueue(Message *m, int priority, uint64_t id)
{
Mutex::Locker l(lock);
ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
add_arrival(m);
if (priority >= CEPH_MSG_PRIO_LOW) {
mqueue.enqueue_strict(
id, priority, QueueItem(m));
} else {
mqueue.enqueue(
id, priority, m->get_cost(), QueueItem(m));
}
cond.Signal(); //唤醒dispatch_queue.start() 启动的dispatchThread,进入entry进行处理
}
看完了这篇文章,相信你对“如何实现ceph SimpleMessenger模块消息的接收”有了一定的了解,如果想了解更多相关知识,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/2326998/blog/1814394