Ceph OSD的心跳机制分析

心跳机制在Ceph中承担非常重要的角色,所有OSD之间都需要通过心跳来确认各个OSD的状态,并且在OSD出现失联,Crash等情况下能及时的被发现,从而进行故障OSD摘除,触发数据重平衡等,保证数据的安全性。

所以弄明白当前Ceph的心跳机制,理顺OSD从故障到被集群踢出的流程是十分必要的。

心跳初始化 & 心跳发送

首先我们从Ceph OSD进程的启动main函数开始,代码在src/ceph_osd.cc

int main(int argc, const char **argv)
{
// ...

//首先创建前端、后端的发送、接收总共四个Messenger
  Messenger *ms_hb_back_client = Messenger::create(g_ceph_context, cluster_msg_type,
				entity_name_t::OSD(whoami), "hb_back_client",
				nonce, Messenger::HEARTBEAT);
  Messenger *ms_hb_front_client = Messenger::create(g_ceph_context, public_msg_type,
				entity_name_t::OSD(whoami), "hb_front_client",
				nonce, Messenger::HEARTBEAT);
  Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, cluster_msg_type,
				entity_name_t::OSD(whoami), "hb_back_server",
				nonce, Messenger::HEARTBEAT);
  Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, public_msg_type,
				entity_name_t::OSD(whoami), "hb_front_server",
				nonce, Messenger::HEARTBEAT);

// ...

// 接下来进行绑定,将前后端对应的IP地址绑定到对应的Messenger上
  entity_addrvec_t hb_front_addrs = public_addrs;
  for (auto& a : hb_front_addrs.v) {
    a.set_port(0);
  }
  if (ms_hb_front_server->bindv(hb_front_addrs) < 0)
    forker.exit(1);
  if (ms_hb_front_client->client_bind(hb_front_addrs.front()) < 0)
    forker.exit(1);

  entity_addrvec_t hb_back_addrs = cluster_addrs;
  for (auto& a : hb_back_addrs.v) {
    a.set_port(0);
  }
  if (ms_hb_back_server->bindv(hb_back_addrs) < 0)
    forker.exit(1);
  if (ms_hb_back_client->client_bind(hb_back_addrs.front()) < 0)
    forker.exit(1);

// ...

// 创建OSD实例
  osdptr = new OSD(g_ceph_context,
		   store,
		   whoami,
		   ms_cluster,
		   ms_public,
		   ms_hb_front_client,
		   ms_hb_back_client,
		   ms_hb_front_server,
		   ms_hb_back_server,
		   ms_objecter,
		   &mc,
		   data_path,
		   journal_path);

// ...

// 此时心跳等还没启动,真正初始化心跳是在OSD::init()里
  err = osdptr->init();
}

初始化心跳相关逻辑在OSD::init()方法里,我们转到src/osd/OSD.cc

int OSD::init()
{
// ...

// 给各个心跳相关的Messenger加上回调,用于处理心跳返回,以及其他OSD发送到自己的心跳,具体流程后面分析
  hb_front_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);
  hb_back_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);
  hb_front_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
  hb_back_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);

// ...

// 正式启动心跳
// start the heartbeat
  heartbeat_thread.create("osd_srv_heartbt");
}

继续,看看heartbeat_thread的定义是什么样的,代码在src/osd/OSD.h:

struct T_Heartbeat : public Thread {
  OSD *osd;
  explicit T_Heartbeat(OSD *o) : osd(o) {}
  void *entry() override {
    osd->heartbeat_entry();
    return 0;
  }
} heartbeat_thread;

很简单,就是调用osd->heartbeat_entry(),那就继续看OSD::heartbeat_entry()做了哪些事,同样实现在src/osd/OSD.cc

void OSD::heartbeat_entry()
{
  std::unique_lock l(heartbeat_lock);
  if (is_stopping())
    return;
  while (!heartbeat_stop) {
  // 调用OSD::heartbeat()
    heartbeat();

  // 根据配置,等待一段时间,继续心跳
    double wait;
    if (cct->_conf.get_val<bool>("debug_disable_randomized_ping")) {
      wait = (float)cct->_conf->osd_heartbeat_interval;
    } else {
      wait = .5 + ((float)(rand() % 10)/10.0) * (float)cct->_conf->osd_heartbeat_interval;
    }
    auto w = ceph::make_timespan(wait);
    dout(30) << "heartbeat_entry sleeping for " << wait << dendl;
    heartbeat_cond.wait_for(l, w);
    if (is_stopping())
      return;
    dout(30) << "heartbeat_entry woke up" << dendl;
  }
}

void OSD::heartbeat()
{
// ...

  // 获取当前的系统负载,在心跳包中,会带上当前OSD所在机器的负载信息
  // 这里用了个公式,计算的是一天的累计的负载
  // get CPU load avg
  double loadavgs[1];
  int hb_interval = cct->_conf->osd_heartbeat_interval;
  int n_samples = 86400;
  if (hb_interval > 1) {
    n_samples /= hb_interval;
    if (n_samples < 1)
      n_samples = 1;
  }

  if (getloadavg(loadavgs, 1) == 1) {
    logger->set(l_osd_loadavg, 100 * loadavgs[0]);
    daily_loadavg = (daily_loadavg * (n_samples - 1) + loadavgs[0]) / n_samples;
    dout(30) << "heartbeat: daily_loadavg " << daily_loadavg << dendl;
  }

// ...

  utime_t now = ceph_clock_now();
  auto mnow = service.get_mnow();
  utime_t deadline = now;
  deadline += cct->_conf->osd_heartbeat_grace;

  // 遍历所有需要心跳检测的Peer
  // send heartbeats
  for (map<int,HeartbeatInfo>::iterator i = heartbeat_peers.begin();
       i != heartbeat_peers.end();
       ++i) {
  
  // ...

    // 通过集群内网络发送MOSDPing消息
    i->second.con_back->send_message(
      new MOSDPing(monc->get_fsid(),
		   service.get_osdmap_epoch(),
		   MOSDPing::PING,
		   now,
		   mnow,
		   mnow,
		   service.get_up_epoch(),
		   cct->_conf->osd_heartbeat_min_size,
		   delta_ub));

    // 如果前端网络是分离的,那从前端网络也发送MOSDPing消息
    if (i->second.con_front)
      i->second.con_front->send_message(
	new MOSDPing(monc->get_fsid(),
		     service.get_osdmap_epoch(),
		     MOSDPing::PING,
		     now,
		     mnow,
		     mnow,
		     service.get_up_epoch(),
		     cct->_conf->osd_heartbeat_min_size,
		     delta_ub));
  }

  logger->set(l_osd_hb_to, heartbeat_peers.size());

  // 有个情况是就一个OSD,那就等着OSDMap更新吧。
  // hmm.. am i all alone?
  dout(30) << "heartbeat lonely?" << dendl;
  if (heartbeat_peers.empty()) {
    if (now - last_mon_heartbeat > cct->_conf->osd_mon_heartbeat_interval && is_active()) {
      last_mon_heartbeat = now;
      dout(10) << "i have no heartbeat peers; checking mon for new map" << dendl;
      osdmap_subscribe(get_osdmap_epoch() + 1, false);
    }
  }

  dout(30) << "heartbeat done" << dendl;
}

心跳对象确定

心跳逻辑还是比较好理解的,基本就是个for循环,但是有个小细节,就是heartbeat_peers哪来的?哪些OSD会被放入到这个心跳列表?是所有的OSD么?这个列表的初始化在OSD::maybe_update_heartbeat_peers():


void OSD::maybe_update_heartbeat_peers()
{
// ...

  // 首先,把所有OSD负责的PG的副本OSD,加到列表里
  // build heartbeat from set
  if (is_active()) {
    vector<PGRef> pgs;
    _get_pgs(&pgs);
    for (auto& pg : pgs) {
      pg->with_heartbeat_peers([&](int peer) {
	  if (get_osdmap()->is_up(peer)) {
	    _add_heartbeat_peer(peer);
	  }
	});
    }
  }

  // 然后再看看OSDMap,把前后相邻的OSD先算出来
  // include next and previous up osds to ensure we have a fully-connected set
  set<int> want, extras;
  const int next = get_osdmap()->get_next_up_osd_after(whoami);
  if (next >= 0)
    want.insert(next);
  int prev = get_osdmap()->get_previous_up_osd_before(whoami);
  if (prev >= 0 && prev != next)
    want.insert(prev);

  // 根据配置,从OSDMap里随机找一些OSD
  // make sure we have at least **min_down** osds coming from different
  // subtree level (e.g., hosts) for fast failure detection.
  auto min_down = cct->_conf.get_val<uint64_t>("mon_osd_min_down_reporters");
  auto subtree = cct->_conf.get_val<string>("mon_osd_reporter_subtree_level");
  auto limit = std::max(min_down, (uint64_t)cct->_conf->osd_heartbeat_min_peers);
  get_osdmap()->get_random_up_osds_by_subtree(
    whoami, subtree, limit, want, &want);

  // 把这些OSD全加到extras列表里
  for (set<int>::iterator p = want.begin(); p != want.end(); ++p) {
    dout(10) << " adding neighbor peer osd." << *p << dendl;
    extras.insert(*p);
    _add_heartbeat_peer(*p);
  }

  // 二次确认一下,如果有非UP的OSD,就从列表里去掉
  // remove down peers; enumerate extras
  map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
  while (p != heartbeat_peers.end()) {
    if (!get_osdmap()->is_up(p->first)) {
      int o = p->first;
      ++p;
      _remove_heartbeat_peer(o);
      continue;
    }
    if (p->second.epoch < get_osdmap_epoch()) {
      extras.insert(p->first);
    }
    ++p;
  }

  // 下面就是根据配置调整心跳列表OSD的数量,让数量保持在一个合理的值
  // too few?
  for (int n = next; n >= 0; ) {
    if ((int)heartbeat_peers.size() >= cct->_conf->osd_heartbeat_min_peers)
      break;
    if (!extras.count(n) && !want.count(n) && n != whoami) {
      dout(10) << " adding random peer osd." << n << dendl;
      extras.insert(n);
      _add_heartbeat_peer(n);
    }
    n = get_osdmap()->get_next_up_osd_after(n);
    if (n == next)
      break;  // came full circle; stop
  }

  // too many?
  for (set<int>::iterator p = extras.begin();
       (int)heartbeat_peers.size() > cct->_conf->osd_heartbeat_min_peers && p != extras.end();
       ++p) {
    if (want.count(*p))
      continue;
    _remove_heartbeat_peer(*p);
  }

  dout(10) << "maybe_update_heartbeat_peers " << heartbeat_peers.size() << " peers, extras " << extras << dendl;

  // clean up stale failure pending
  for (auto it = failure_pending.begin(); it != failure_pending.end();) {
    if (heartbeat_peers.count(it->first) == 0) {
      send_still_alive(get_osdmap_epoch(), it->first, it->second.second);
      failure_pending.erase(it++);
    } else {
      it++;
    }
  }
}

心跳接收

在OSD初始化的一开始,我们已经知道4个心跳相关Messenger注册了一个回调heartbeat_dispatcher用于处理收到的请求。这是封装后的dispatcher,最终会执行osd->heartbeat_dispatch(m),这个heartbeat_dispatch也定义在src/osd/OSD.cc:

bool OSD::heartbeat_dispatch(Message *m)
{
  dout(30) << "heartbeat_dispatch " << m << dendl;
  switch (m->get_type()) {

// ...

// 如果是MSG_OSD_PING类型MSG继续调用handle_osd_ping
  case MSG_OSD_PING:
    handle_osd_ping(static_cast<MOSDPing*>(m));
    break;

  return true;
}

void OSD::handle_osd_ping(MOSDPing *m)
{
    // ...

    switch (m->op)
    {
    // 收到PING包
    case MOSDPing::PING:
    {

        // 发送回包
        Message *r = new MOSDPing(monc->get_fsid(),
                                  curmap->get_epoch(),
                                  MOSDPing::PING_REPLY,
                                  m->ping_stamp,
                                  m->mono_ping_stamp,
                                  mnow,
                                  service.get_up_epoch(),
                                  cct->_conf->osd_heartbeat_min_size,
                                  sender_delta_ub);
        con->send_message(r);
		// ...
        else if (!curmap->exists(from) ||
                 curmap->get_down_at(from) > m->map_epoch)
        {
            /*
			 * 如果对端异常,发送个MOSDPing::YOU_DIED包
			 * tell them they have died
			 */
            Message *r = new MOSDPing(monc->get_fsid(),
                                      curmap->get_epoch(),
                                      MOSDPing::YOU_DIED,
                                      m->ping_stamp,
                                      m->mono_ping_stamp,
                                      mnow,
                                      service.get_up_epoch(),
                                      cct->_conf->osd_heartbeat_min_size);
            con->send_message(r);
        }
    }
    break;

    /* 收到PING回包 */
    case MOSDPing::PING_REPLY:
    {
        map<int, HeartbeatInfo>::iterator i = heartbeat_peers.find(from);

		  // 如果在心跳列表里
      // 根据连接更新前后端的上次心跳时间
      // 如果只有后端网络,则前后端一起更新,如果前后端网络都有,则各自更新各自的
        if (i != heartbeat_peers.end())
        {
            auto acked = i->second.ping_history.find(m->ping_stamp);
            if (acked != i->second.ping_history.end())
            {
                int &unacknowledged = acked->second.second;
                if (con == i->second.con_back)
                {
 
                    i->second.last_rx_back = now;
                    ceph_assert(unacknowledged > 0);
                    --unacknowledged;
                    /* if there is no front con, set both stamps. */
                    if (i->second.con_front == NULL)
                    {
                        i->second.last_rx_front = now;
                        ceph_assert(unacknowledged > 0);
                        --unacknowledged;
                    }
                }
                else if (con == i->second.con_front)
                {
                    dout(25) << "handle_osd_ping got reply from osd." << from
                             << " first_tx " << i->second.first_tx
                             << " last_tx " << i->second.last_tx
                             << " last_rx_back " << i->second.last_rx_back
                             << " last_rx_front " << i->second.last_rx_front
                             << " -> " << now
                             << dendl;
                    i->second.last_rx_front = now;
                    ceph_assert(unacknowledged > 0);
                    --unacknowledged;
                }

// ...还会记录一些历史信息,这里不分析了。

                /* 如果Peer状态正常,清除掉之前不正常的状态 */
                if (i->second.is_healthy(now))
                {
                    /* Cancel false reports */
                    auto failure_queue_entry = failure_queue.find(from);
                    if (failure_queue_entry != failure_queue.end())
                    {
                        dout(10) << "handle_osd_ping canceling queued "
                                 << "failure report for osd." << from << dendl;
                        failure_queue.erase(failure_queue_entry);
                    }

                    auto failure_pending_entry = failure_pending.find(from);
                    if (failure_pending_entry != failure_pending.end())
                    {
                        dout(10) << "handle_osd_ping canceling in-flight "
                                 << "failure report for osd." << from << dendl;
                        send_still_alive(curmap->get_epoch(),
                                         from,
                                         failure_pending_entry->second.second);
                        failure_pending.erase(failure_pending_entry);
                    }
                }
            }
            else
            {
                /* old replies, deprecated by newly sent pings. */
                dout(10) << "handle_osd_ping no pending ping(sent at " << m->ping_stamp
                         << ") is found, treat as covered by newly sent pings "
                         << "and ignore"
                         << dendl;
            }
        }

    }
    break;

    /* 收到MOSDPing::YOU_DIED包,更新osdmap */
    case MOSDPing::YOU_DIED:
        dout(10) << "handle_osd_ping " << m->get_source_inst()
                 << " says i am down in " << m->map_epoch << dendl;
        osdmap_subscribe(curmap->get_epoch() + 1, false);
        break;
    }

    heartbeat_lock.unlock();
    m->put();
}

超时检测和上报

心跳流程分析的差不多了,该进行心跳的超时检测和信息上报了。

心跳的超时检测,是在OSD::heartbeat_check()方法里检测的:

void OSD::heartbeat_check()
{
    ceph_assert(ceph_mutex_is_locked(heartbeat_lock));
    utime_t now = ceph_clock_now();

    // check for incoming heartbeats (move me elsewhere?)
    for (map<int, HeartbeatInfo>::iterator p = heartbeat_peers.begin();
         p != heartbeat_peers.end();
         ++p) {
        // 如果一个心跳还没发呢,先跳过
        if (p->second.first_tx == utime_t()) {
            dout(25) << "heartbeat_check we haven't sent ping to osd." << p->first
                     << " yet, skipping" << dendl;
            continue;
        }

        // 如果有发生超时的情况
        if (p->second.is_unhealthy(now)) {
            utime_t oldest_deadline = p->second.ping_history.begin()->second.first;
            if (p->second.last_rx_back == utime_t() ||
                p->second.last_rx_front == utime_t()) {
                // fail
                // 一个返回都没收到,扔failure_queue队列里
                failure_queue[p->first] = p->second.first_tx;
            } else {
                // fail
                // 收到过返回,但是依然超时了,扔failure_queue队列里
                failure_queue[p->first] = std::min(p->second.last_rx_back, p->second.last_rx_front);
            }
        }
    }
}

这个检测还是比较简单的,那么什么时候上报到Monitor呢?我们慢慢来:

OSD:init()阶段,做了这样一件事:

int OSD::init()
{
  // ...

  tick_timer_without_osd_lock.init();

  // ...

  {
    std::lock_guard l(tick_timer_lock);
    // 定时执行一下C_Tick_WithoutOSDLock
    tick_timer_without_osd_lock.add_event_after(get_tick_interval(),
						new C_Tick_WithoutOSDLock(this));
  }
}
class OSD::C_Tick_WithoutOSDLock : public Context {
  OSD *osd;
  public:
  explicit C_Tick_WithoutOSDLock(OSD *o) : osd(o) {}
  void finish(int r) override {
    // 实际执行的是OSD::tick_without_osd_lock()
    osd->tick_without_osd_lock();
  }
};

OSD::tick_without_osd_lock()中,做了很多事情,其中就包括OSD心跳超时的检测和Monitor的上报:

void OSD::tick_without_osd_lock()
{
  // ...

  if (is_active() || is_waiting_for_healthy()) {
    {
      std::lock_guard l{heartbeat_lock};
      // 检测心跳超时
      heartbeat_check();
    }
    map_lock.lock_shared();
    std::lock_guard l(mon_report_lock);

    // mon report?
    // 根据情况上报Monitor,最长不超过osd_mon_report_interval
    utime_t now = ceph_clock_now();
    if (service.need_fullness_update() ||
	now - last_mon_report > cct->_conf->osd_mon_report_interval) {
      last_mon_report = now;
      send_full_update();
      // 上报失败的OSD
      send_failures();
    }
  }

  // 等待下次tick
  tick_timer_without_osd_lock.add_event_after(get_tick_interval(),
					      new C_Tick_WithoutOSDLock(this));
}

再看看OSD::send_failures():

void OSD::send_failures()
{
  // ...

  while (!failure_queue.empty()) {
      int osd = failure_queue.begin()->first;
      if (!failure_pending.count(osd)) {
          int failed_for = (int)(double)(now - failure_queue.begin()->second);
          // 给Monitor发个MOSDFailure消息,带上失效OSD的各种信息
          monc->send_mon_message(
              new MOSDFailure(
                  monc->get_fsid(),
                  osd,
                  osdmap->get_addrs(osd),
                  failed_for,
                  osdmap->get_epoch()));
          failure_pending[osd] = make_pair(failure_queue.begin()->second,
                                            osdmap->get_addrs(osd));
      }
      failure_queue.erase(osd);
  }
}

到此,OSD针对心跳的部分就基本结束了。剩下来就要看Monitor收到MOSDFailure消息之后怎么处理了。