远程消息投递
游戏服务器一般来说是由承担不同功能角色一组进程的集合,这些进程可以放在同一台物理机上,也可以分布式的存在于多台通过网络互联的物理机中。游戏服务器内管理着各种可以通信的对象,分布在这些进程之中。同时这些可通信的对象与进程之间的关系是动态的,可能会按照业务需求将某个对象从物理机器A中的某个进程切换到物理机器B中的某个进程。所以在游戏服务器中的对象间通信不能直接使用ip:port::target_id这种静态的配置形式,需要在业务层封装好一个基于对象名字的虚拟通道channel消息投递机制,以应对对象的进程间迁移。同时由于游戏内的玩家可能随时上线下线,所以对于玩家的某些消息还需要额外考虑其不在线的情况
,使用持久化的数据库来确保玩家对这些持久化消息的有序接收。接下来我们将介绍一下mosaic_game中提供的面向不同应用场景的各种消息投递机制,以及相关应用组件。
在线消息投递
在线消息投递处理的是向目前在线的服务端entity,service发送rpc消息的过程。如果发送时目标不在线,则消息投递失败;如果在投递过程中目标下线,则消息投递失败。由于此类消息不保证能够可靠的通知到目标,所以此类消息一般都是用来执行客户端消息通知,不能依赖此类消息去修改entity的持久化数据。根据这个在线消息投递实现机制,又可以细分为三种子类型:
- 单播消息 只向一个目标进行消息投递
- 多播消息 向多个在线目标进行消息投递
- 广播消息 向所有在线目标进行消息投递
单播消息
在线单播消息投递是所有消息投递实现的基础,在mosaic_game中暴露出了如下接口来支持向指定的一个对象发送在线消息:
void json_server::call(std::shared_ptr<const std::string> dest, std::shared_ptr<const std::string> msg, enums::packet_cmd cur_packet_cmd, std::uint8_t cur_packet_detail_cmd)
{
m_router->push_msg(m_local_name_ptr, dest, msg, enums::packet_cmd_helper::encode(cur_packet_cmd, cur_packet_detail_cmd));
}
dest字段代表投递目标的唯一标识符,为server_id::target_id形式的字符串,代表server_id服务器上的target_id对应的发送目标,后面的::target_id某些情况下可以省略。然后msg字段就是要发送的消息的字节流,这里使用shared_ptr<const std::string>是为了避免出现字符串的拷贝操作,同时也能更好的支持多播。
参数里剩下的cur_packet_cmd与cur_packet_detail_cmd可以理解为消息类型和在这个消息下的消息子类型,编码时会将这两个字段合并为一个uint16进行处理:
enum class packet_cmd: std::uint8_t
{
server_control = 0,
client_to_game,
game_to_client,
server_rpc_msg,
server_raw_msg,
entity_msg,
actor_migrate_msg,
max,
};
struct packet_cmd_helper
{
static std::uint16_t encode(packet_cmd in_packet_cmd, std::uint8_t in_cmd_detail)
{
std::uint16_t result = std::uint16_t(in_packet_cmd);
result <<= 8;
result += in_cmd_detail;
return result;
}
static std::pair<packet_cmd, std::uint8_t> decode(std::uint16_t in_combine_cmd)
{
std::pair<packet_cmd, std::uint8_t> result;
result.second = in_combine_cmd % 256;
result.first = packet_cmd(in_combine_cmd / 256);
return result;
}
};
这个call接口只是对network_router::push_msg的一个简单转发,额外加上了当前发送者的进程标识符m_local_name_ptr,内部会根据这个dest找到合适的远程连接connection,并将消息添加到这个connection的发送队列中:
bool network_router::push_msg(std::shared_ptr<const std::string> from, std::shared_ptr<const std::string> dest, std::shared_ptr<const std::string> data, std::uint16_t cmd)
{
auto cur_proxy_resource = m_anchor_collection.find_proxy_for_anchor(*dest);
if(!cur_proxy_resource)
{
m_logger->error("push_msg cant find anchor_resources from {} dest {} data {}", *from, *dest, *data);
return false;
}
auto cur_proxy_con = cur_proxy_resource->get_connection();
if (cur_proxy_con)
{
if (push_msg(cur_proxy_con, from, dest, data, cmd))
{
return true;
}
}
return cur_proxy_resource->try_push(from, dest, data, cmd);
}
从dest找到对应的connection的逻辑由anchor_collection类型负责,这个类型记录了anchor锚点到网络投递资源的映射,对外暴露了名字绑定接口来添加两者之间的映射:
bool network_router::link_anchor_to_connection(const std::string& anchor_name, const net_connection* connection)
{
auto cur_connection_iter = m_connection_resources.find(connection);
if (cur_connection_iter == m_connection_resources.end())
{
return false;
}
if(m_anchor_collection.create_resource(anchor_name, connection, cur_connection_iter->second->output_channel))
{
cur_connection_iter->second->anchors.insert(anchor_name);
return true;
}
else
{
return false;
}
}
查询的时候查找以::分割的最长前缀去匹配。举个例子来说,如果server1::target_id找不到记录的话,则继续以server_1去查找记录,这样就可以让server_1这个connection_resource去代理所有的server_1::xxx形式的rpc目标。
有些时候传入的投递地址可能会是本进程的地址,此时查找connection_resource的话会失败,因为当前并没有为本进程创建一个connection,从而导致投递消息失败,所以上层在投递消息的时候要专门为这个本地地址做过滤。这里的代码在发现dest是一个本进程地址之后,会将这个数据直接放到主循环消息队列中:
bool network_router::push_msg(std::shared_ptr<const std::string> from, std::shared_ptr<const std::string> dest, std::shared_ptr<const std::string> data, std::uint16_t cmd)
{
if(dest->rfind(m_local_anchor_name, 0) == 0)
{
// 说明是本进程地址 直接推送数据到input_msg_queue
network::con_msg_task local_msg_task;
local_msg_task.first = {};
local_msg_task.second = msg_task::construct(from, dest, data, cmd);
m_input_msg_queue->push_msg(std::move(local_msg_task));
}
// 省略一些代码
}
找到connection之后的消息发送逻辑已经在之前的网络通信章节中介绍过了,读者可以回顾一下相关的内容来了解底层TCP传输细节,这里就不再介绍。当目标进程接收到了这个消息之后,主循环的on_msg回调首先解析出from,dest, msg, cmd这四个字段,然后根据cmd的类型与dest地址来做本进程的消息分发。由于具体的分发函数实现有点长,这里就只贴出space_server::on_msg中处理enums::packet_cmd::entity_msg的部分:
case enums::packet_cmd::entity_msg:
{
if(!one_msg.dest)
{
m_logger->error("dest empty while handle rpc msg {}", *one_msg.data);
return true;
}
utility::rpc_msg::call_result dispatch_result;
auto real_dest = remove_local_anchor_prefix(*one_msg.dest);
if(check_msg_forward(real_dest, one_msg))
{
return true;
}
dispatch_result = entity::entity_manager::instance().dispatch_entity_msg(real_dest, cur_cmd_detail.second, one_msg.data);
if (dispatch_result != utility::rpc_msg::call_result::suc)
{
m_logger->error("fail to dispatch raw_msg dest {} sync_cmd {} with error {}", *one_msg.dest, cur_cmd_detail.second, int(dispatch_result));
}
return true;
}
这里的remove_local_anchor_prefix相当于把server_id::entity_id形式的dest解析出entity_id部分,check_msg_forward的函数行为我们后面再介绍,剩下的逻辑就是entity_manager根据entity_id去找到对应的entity并调用on_entity_msg接口来处理本次数据:
utility::rpc_msg::call_result dispatch_entity_msg(const std::string& dest, std::uint8_t cmd, std::shared_ptr<const std::string> msg)
{
auto cur_entity = get_entity(dest);
if (!cur_entity)
{
return utility::rpc_msg::call_result::dest_not_found;
}
return cur_entity->on_entity_msg(cmd, msg);
}
这个on_entity_msg再根据消息的子类型来做格式解析,并最终执行到rpc的分发函数on_rpc_msg:
utility::rpc_msg::call_result server_entity::on_entity_msg(std::uint8_t cmd, std::shared_ptr<const std::string> msg_ptr)
{
if(cmd == std::uint8_t(enums::entity_packet::json_rpc))
{
utility::rpc_msg e_msg;
try
{
json msg = json::parse(*msg_ptr);
msg.at("cmd").get_to(e_msg.cmd);
msg.at("args").get_to(e_msg.args);
auto from_iter = msg.find("from");
if (from_iter != msg.end())
{
from_iter->get_to(e_msg.from);
}
auto err_iter = msg.find("err");
if(err_iter != msg.end())
{
err_iter->get_to(e_msg.err);
}
}
catch (std::exception& e)
{
m_logger->error("fail to decode rpc_msg {} error {}", *msg_ptr, e.what());
return utility::rpc_msg::call_result::invalid_format;
}
return on_rpc_msg(e_msg);
}
return utility::rpc_msg::call_result::rpc_not_found;
}
on_rpc_msg之后的逻辑在rpc部分已经介绍过了,这里就不重复阐述。上面贴的代码对应的是space_server上的server_entity消息投递,其实与service_server上的base_service的消息投递机制基本没有差异,两者的on_rpc_msg机制是一样的,只不过一个被entity_manager中转分发,而另外一个被service_manager中转分发。类似的还有space_server上的manager_base,用来管理一些能够接受rpc的非server_entity单例数据:
utility::rpc_msg::call_result space_server::on_server_rpc_msg(const std::string& dest, const utility::rpc_msg& cur_rpc_msg)
{
auto dispatch_result = entity::entity_manager::instance().dispatch_rpc_msg(dest, cur_rpc_msg);
if(dispatch_result != utility::rpc_msg::call_result::dest_not_found)
{
return dispatch_result;
}
return manager_base::dispatch_rpc(dest, cur_rpc_msg);
}
void manager_base::init_managers(space_server* in_space_server)
{
offline_msg_manager::instance().init(in_space_server);
email_manager::instance().init(in_space_server);
notify_manager::instance().init(in_space_server);
rank_manager::instance().init(in_space_server);
space_manager::instance().init(in_space_server);
}
不过service与server_entity在投递地址的表示上有很大的不同,server_entity的投递地址都是server_id::entity_id这个形式,而service的投递地址就只有service_type这个形式,没有绑定的server_id。这个解绑server_id的原因是为了支持service在不同的进程之间动态迁移从而实现负载均衡以及容灾,所以往特定service发消息的时候,只需要传递这个服务的名字即可作为投递地址:
void space_server::call_service( const std::string& service_name, const utility::rpc_msg& msg)
{
auto dest_server = choose_server_for_service(service_name);
if(dest_server.empty())
{
m_logger->warn("fail to find server for service {} msg {}", service_name, json(msg).dump());
return;
}
call_server(service_name, msg);
}
在一个service被创建的时候,每个进程都会接收到对应的创建通知,内部包含了所在的服务器信息,进程接收到这个消息之后会记录服务与进程之间的关联信息到m_services_by_id这个map成员上:
void space_server::on_notify_service_created(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> from, const json& msg)
{
std::string service_type;
std::string service_id;
std::string service_server;
try
{
msg.at("service_id").get_to(service_id);
msg.at("service_type").get_to(service_type);
msg.at("service_server").get_to(service_server);
}
catch(const std::exception& e)
{
m_logger->error("on_notify_service_created fail to parse msg {} error {}", msg.dump(), e.what());
return;
}
m_services_by_id[service_id] = std::make_pair(service_type, service_server);
}
注意这里的key不是service_type而是一个根据规则生成的service唯一id, 这样做的理由是为了支持同一个service_type创建多个实例来做负载均衡。当需要给一个service发消息的时候,会查询是否有这个服务以及是否已经绑定了进程,这个绑定信息记录在m_services_by_pref上,如果没有绑定则遍历m_services_by_id里这个service_type对应的所有实例,随机选择其中的一个来执行绑定:
std::string space_server::choose_server_for_service(const std::string& service_name)
{
auto temp_iter_1 = m_services_by_pref.find(service_name);
if(temp_iter_1 != m_services_by_pref.end())
{
return temp_iter_1->second.second;
}
std::vector<std::pair<std::string, std::string>> temp_services;
temp_services.reserve(8);
for(const auto& one_pair: m_services_by_id)
{
if(one_pair.second.first != service_name)
{
continue;
}
temp_services.push_back(std::make_pair(one_pair.first, one_pair.second.second));
}
if(temp_services.empty())
{
return {};
}
auto cur_ms = utility::timer_manager::now_ts();
auto cur_result = temp_services[cur_ms % temp_services.size()];
m_services_by_pref[service_name] = cur_result;
m_router->link_anchor_to_connection(service_name, cur_result.second);
m_logger->info("set server {} service_id {} to the pref server of service_type {}", cur_result.second, cur_result.first, service_name);
return cur_result.second;
}
绑定之后就会在network_router中使用link_anchor_to_connection来注册投递地址与网络连接之间的映射,这里使用绑定而不是每次发送消息时都随机从temp_service中抽取一个的理由是保证从一个进程发送到一个投递地址的消息的接收顺序与发送顺序一致。
server_entity对于动态迁移的需求比service大很多,因为可迁移的server_entity数量相对service数量来说大好几个数量级。同时server_entity的迁移触发的比service更加频繁,每次切换场景时都可能触发,而service的迁移时机只有动态扩缩容和容灾。所以对于可迁移的server_entity的消息投递不能采取与service一样的广播推送最新地址的机制,这个机制会导致需要同步的信息量太多。所以针对server_entity的高频迁移特性,其在线消息投递使用的是一种基于中转的消息投递机制,对于这些可迁移的server_entity,在创建的时候就会在所在进程创建一个专门为其做消息转发服务的relay_entity:
player_entity* account_entity::create_player_entity(const std::string& player_id, const json& player_doc)
{
std::string cur_err;
auto cur_relay_entity_id = std::to_string(get_server()->gen_unique_uint64());
json::object_t relay_init_info;
relay_init_info["dest_eid"] = player_id;
relay_init_info["dest_game"] = get_server()->local_server_info().name;
auto cur_relay_entity = get_server()->create_entity("relay_entity", cur_relay_entity_id, gen_online_entity_id(),relay_init_info, cur_err);
if(!cur_relay_entity)
{
m_logger->error("fail to create relay_entity");
return nullptr;
}
m_relay_entity = dynamic_cast<relay_entity*>(cur_relay_entity);
json::object_t player_init_info;
player_init_info["prop"] = player_doc;
player_init_info["prop"]["base"]["account_anchor"] = *get_call_proxy();
player_init_info["prop"]["base"]["gate_name"] = m_gate_name;
player_init_info["is_ghost"] = false;
player_init_info["space_id"] = std::string();
player_init_info["call_proxy"] = *cur_relay_entity->get_call_proxy();
auto cur_entity = get_server()->create_entity("player_entity", player_id, gen_online_entity_id(),player_init_info, cur_err);
}
在创建好了relay_entity之后再创建所需的server_entity,同时将relay_entity的消息投递地址绑定在server_entity的call_proxy上。由于relay_entity是不可迁移的,所以使用call_proxy发送消息可以保证单进程有序与这个relay_entity进行消息通信。这个relay_entity会记录对应的server_entity的最新进程地址,server_entity每次迁移之前都需要通知relay_entity进行进程地址清空,等到relay_entity返回了迁移确认之后这个server_entity才能开始真正的迁移。在server_entity迁移成功之后,再将最新绑定的进程信息发送给relay_entity进行重新绑定。
void relay_entity::request_migrate_begin(const utility::rpc_msg& msg, const std::string& game_id, const std::string& space_id, const std::string& union_space_id, const json::object_t& enter_info)
{
if(!m_dest_actor)
{
m_logger->error("request_migrate_begin while dest_anchor empty dest_game {} dest_eid {}", m_dest_game, m_dest_eid);
return;
}
utility::rpc_msg reply_msg;
reply_msg.cmd = "reply_migrate_begin";
reply_msg.args.push_back(game_id);
reply_msg.args.push_back(space_id);
reply_msg.args.push_back(union_space_id);
reply_msg.args.push_back(enter_info);
call_server(m_dest_actor, reply_msg);
m_dest_actor.reset();
m_dest_game = game_id;
}
void relay_entity::notify_migrate_finish(const utility::rpc_msg& msg, const std::string& game_id)
{
if(m_dest_game != game_id)
{
m_logger->error("notify_migrate_finish while game not match empty dest_game {} dest_eid {} new_game_id {}", m_dest_game, m_dest_eid, game_id);
return;
}
m_dest_actor = std::make_shared<std::string>(utility::rpc_anchor::concat(m_dest_game, m_dest_eid));
if(m_dest_game == get_local_server_name())
{
auto dest_entity = entity_manager::instance().get_entity(m_dest_eid);
if(dest_entity)
{
for(const auto& one_msg:m_cached_msgs)
{
auto cur_cmd_detail = enums::packet_cmd_helper::decode(one_msg.cmd);
auto dispatch_result = dest_entity->on_entity_msg(cur_cmd_detail.second, one_msg.data);
if(dispatch_result != utility::rpc_msg::call_result::suc)
{
m_logger->error("entity {} fail to dispatch cmd {} data {} err {}", m_dest_eid, cur_cmd_detail.second, *one_msg.data, std::uint8_t(dispatch_result));
}
}
m_cached_msgs.clear();
}
}
for(const auto& one_msg: m_cached_msgs)
{
auto cur_cmd_detail = enums::packet_cmd_helper::decode(one_msg.cmd);
call_server(m_dest_actor, one_msg.data, cur_cmd_detail.first, cur_cmd_detail.second);
}
m_cached_msgs.clear();
}
当relay_entity接收到一个转发请求之后,先检查对应server_entity的进程绑定信息,如果没有绑定则先放到m_cached_msgs队列中,否则直接向对应的绑定进程发消息:
void relay_entity::forward(const network::msg_task& cur_msg_task)
{
if(!m_dest_actor)
{
m_cached_msgs.push_back(cur_msg_task);
}
else
{
auto cur_cmd_detail = enums::packet_cmd_helper::decode(cur_msg_task.cmd);
if(m_dest_game == get_local_server_name())
{
auto dest_entity = entity_manager::instance().get_entity(m_dest_eid);
if(dest_entity)
{
auto dispatch_result = dest_entity->on_entity_msg(cur_cmd_detail.second, cur_msg_task.data);
if(dispatch_result != utility::rpc_msg::call_result::suc)
{
m_logger->error("fail to foward dispatch cmd {} data {} with result {}", cur_cmd_detail.second, *cur_msg_task.data, std::uint8_t(dispatch_result));
}
}
else
{
m_logger->error("fail to find dest entity {}", m_dest_eid);
}
}
else
{
call_server(m_dest_actor, cur_msg_task.data, cur_cmd_detail.first, cur_cmd_detail.second);
}
}
}
当重新绑定之后,还需要将m_cached_msgs全都按照接收顺序发送一遍。这样通过一个不可迁移的relay_entity我们就可以实现对可迁移server_entity的稳定有序消息投递了。
上面的relay_entity机制有一个小问题,就是消息发送方需要知道目标server_entity的call_proxy,如果无法获得这个call_proxy,则无法直接使用relay_entity机制。因此这里对于player_entity还有一个另外的在线通知机制,全局有一个login_service会记录所有在线玩家的account_entity地址,而account_entity也是一个不迁移的server_entity,可以保证其与relay_entity在同一个进程上。所以login_service上提供了一个request_call_online接口来执行向指定的player_id发送一个在线消息:
void login_service::request_call_online(const utility::rpc_msg& msg, const std::string& cur_player_id, const std::string& cmd, const std::vector<json>& args)
{
auto cur_iter = m_online_players.find(cur_player_id);
if(cur_iter == m_online_players.end())
{
m_logger->info("request_call_online not online fail to call {} cmd {} args {}", cur_player_id, cmd, json(args).dump());
return;
}
utility::rpc_msg result_msg;
result_msg.cmd = cmd;
result_msg.args = args;
auto cur_server = get_server();
cur_server->call_server(cur_iter->second, result_msg);
}
当account_entity接收到这个消息之后,会调用call_player来手动获取这个同进程的relay_entity并执行转发:
void account_entity::call_player(const utility::rpc_msg& msg)
{
if(!m_relay_entity)
{
return;
}
m_relay_entity->forward(msg);
}
这样在只拥有目标玩家的entity_id的情况下,可以通过login_service中转到account_entity,再中转到relay_entity,最终中转到player_entity。这样做会涉及到三次网络消息的收发,相对于基于call_proxy的中转来说多了一次,所以最好还是能够以call_proxy的方式去执行消息通知。因此很多service上都会有一个map记录当前所有在线玩家的call_proxy,玩家登录完成之后会向这些service推送自己的call_proxy,下面的就是玩家的聊天组件向聊天服务推送在线状态的代码,维护在线玩家call_proxy的相关代码:
void player_chat_component::on_login(bool is_relay)
{
if(is_relay)
{
return;
}
utility::rpc_msg notify_msg;
notify_msg.cmd = "notify_player_login";
notify_msg.set_args(m_owner->entity_id(), *m_owner->get_call_proxy());
m_owner->call_service("chat_service", notify_msg);
}
void player_chat_component::on_logout()
{
utility::rpc_msg notify_msg;
notify_msg.cmd = "notify_player_logout";
notify_msg.set_args(m_owner->entity_id());
m_owner->call_service("chat_service", notify_msg);
}
在聊天服务上使用一个unordered_map来记录注册过来的在线call_proxy, 同时提供一个封装好的call_online_player来处理向在线玩家发消息的需求:
void chat_service::notify_player_login(const utility::rpc_msg& msg, const std::string& player_id, const std::string& call_proxy)
{
m_online_players[player_id] = std::make_shared<const std::string>(call_proxy);
}
void chat_service::notify_player_logout(const utility::rpc_msg& msg, const std::string& player_id)
{
m_online_players.erase(player_id);
}
void chat_service::call_online_player(const std::string& player_id, const std::string& cmd, const std::vector<json>& args)
{
auto cur_iter = m_online_players.find(player_id);
if(cur_iter == m_online_players.end())
{
return;
}
utility::rpc_msg cur_msg;
cur_msg.cmd = cmd;
cur_msg.args = args;
auto cur_server = get_server();
cur_server->call_server(cur_iter->second, cur_msg);
}
就这样每个有通知在线玩家需求的service都自己维护了在线玩家的call_proxy记录,这样就可以避免都通过login_service去执行消息通知,降低login_service的单点压力。
多播消息
多播消息这种一般都是社群系统会使用到,例如群组、队伍、帮派等社群结构,多播系统的最简实现可以就只需要三行代码:使用一个for循环来遍历投递目标的集合然后调用单播消息投递接口。不过这样的实现会导致发送的消息rpc_msg被重复打包多次,浪费很多cpu,因此在server上提供了一个避免了重复消息打包的优化版本:
void space_server::call_server_multi(const entity::server_entity* cur_entity, const utility::rpc_msg& msg, const std::vector<std::string>& targets)
{
auto shared_msg = msg.to_bytes();
call_server_multi(cur_entity, shared_msg, enums::entity_packet::json_rpc, targets);
}
void space_server::call_server_multi(const entity::server_entity* cur_entity, std::shared_ptr<const std::string> msg, enums::entity_packet cur_entity_packet, const std::vector<std::string>& targets)
{
for (auto one_dest : targets)
{
call_server(cur_entity, std::make_shared<std::string>(one_dest), msg, cur_entity_packet);
}
}
这里使用rpc_msg上的to_bytes接口来预先执行消息打包序列化,生成一个shared_ptr<string>,这样就只需要打包一次,同时这个打包好的数据的生命周期能够被引用计数自动托管,业务层就不需要考虑msg生命周期的具体细节了。
不过这里的for循环也有一个可以性能优化的点,循环体内部会创建发送目标的一个shared_ptr<string>,其实更优的结果是创建一个包含了这个targets数组里所有元素的一个大的shared_ptr<string>,然后push_msg发送的时候使用下面的一个结构来编码dest:
struct shared_string_view
{
std::shared_ptr<std::string> parent;
std::string_view str;
};
这样make_shared只需要执行一次就行了,避免了多次动态内存分配相关的cpu损耗,同时整体的生命周期都被shared_ptr托管了,RAII会自动处理这个parent的资源释放。
有了这个call_server_multi接口之后,外部的多播接口只需要中转一下就好了:
void group_service::group_broadcast(const group_resource* cur_group, const std::string& cmd, const std::vector<json>& args, const std::string& except_id, bool without_leader)
{
const auto& group_anchors = cur_group->get_online(except_id, without_leader);
utility::rpc_msg cur_msg;
cur_msg.cmd = cmd;
cur_msg.args = args;
get_server()->call_server_multi(cur_msg, group_anchors);
}
void team_service::team_broadcast(const team_resource* cur_team, const std::string& cmd, const std::vector<json>& args, const std::string& except_id)
{
std::vector<std::string> team_anchors;
team_anchors.reserve(cur_team->m_prop.m_players.index().size());
for(const auto& one_idx: cur_team->m_prop.m_players.index())
{
auto cur_player_ptr = cur_team->m_prop.m_players.get_slot(one_idx.second);
if(cur_player_ptr->m_id == except_id)
{
continue;
}
team_anchors.push_back(cur_player_ptr->anchor());
}
utility::rpc_msg cur_msg;
cur_msg.cmd = cmd;
cur_msg.args = args;
get_server()->call_server_multi(cur_msg, team_anchors);
}
广播消息
游戏内有些逻辑需要往所有的在线玩家发送消息,这个需求虽然类似于多播,但是使用login_service去遍历所有玩家去调用多播接口的话消息流量会爆炸,而且会导致这个单点的卡顿,广播频率高的话可能会影响登录与下线。此时我们注意到这样的暴力广播流程里,很多时候数据的流向是login_service->game_server::relay_entity->game_server::player_entity->gate_server->client,中间的两层game_server不负责任何逻辑,完全执行数据转发任务。在这种情况下可以考虑跳过game_server相关的两层,直接把要广播的数据直接发向gate_server,然后让gate_server去遍历当前进程上绑定的所有客户端来执行在线通知。在这样的优化设计下,数据链路缩短为login_service->gate_server->client,减少了两层中转,同时数据流量也变得可控了,之前需要发送的数据份数为在线玩家的数量,而现在则缩减为gate_server的数量,这样内网流量放大倍率从几万的量级降低到了几十,同时还极大的降低了login_service的负载。
实际项目使用过程中,广播消息不仅仅只有全服广播,还有门派广播、势力广播、场景广播等多种类型。这些类型虽然不需要给所有在线客户端发送通知消息,但是其通知的人员范围依然是一个成百上千的数量,利用之前的多播接口依然会有性能问题。所以对于这些广播操作,采取先发送给所有的gate_server然后再由gate_server进行过滤后广播的方式依然可以节省很多的内网流量,同样可以避免这个广播接口的负载。在考虑了这些自定义的广播组需求之后,我们需要在gate_server上增加一个unordered_map来记录每个广播组下面的人员:
// 每个群组对应的client
// 这里的key string_view 对应的str就是value里的shared_ptr
std::unordered_map<std::string, std::unordered_map<std::string_view, std::shared_ptr<const std::string>>> m_clients_for_group;
玩家角色可以通过下面的两个接口来维护自己与group之间的关系:
void space_server::update_player_broadcast_group(const entity::player_entity* cur_player, const std::vector<std::string>& groups)
{
auto cur_account_id = *cur_player->shared_account_id();
json params;
params["entity_id"] = cur_account_id;
params["groups"] = groups;
json msg;
msg["cmd"] = "request_link_group";
msg["param"] = params;
m_router->push_msg(cur_player->get_gate_id(), m_local_name_ptr, {}, std::make_shared<std::string>(msg.dump()), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
}
void space_server::clear_player_broadcast_group(const entity::player_entity* cur_player, const std::vector<std::string>& groups)
{
auto cur_account_id = *cur_player->shared_account_id();
json params;
params["entity_id"] = cur_account_id;
params["groups"] = groups;
json msg;
msg["cmd"] = "request_unlink_group";
msg["param"] = params;
m_router->push_msg(cur_player->get_gate_id(), m_local_name_ptr, {}, std::make_shared<std::string>(msg.dump()), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
}
gate_server上提供这两个接口来处理上面的两条消息通知,维护内部的m_clients_for_group:
void on_request_link_group(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> dest, const json& msg);
void on_request_unlink_group(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> dest, const json& msg);
同时gate_server上暴露一个广播接口on_request_broadcast_groups,方便space_server来调用,这个接口还支持给多个group一起广播:
void gate_server::on_request_broadcast_groups(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> dest, const json& msg)
{
std::vector<std::string> groups;
std::string msg_detail;
std::uint8_t msg_cmd;
try
{
msg.at("groups").get_to(groups);
msg.at("msg_detail").get_to(msg_detail);
msg.at("msg_cmd").get_to(msg_cmd);
}
catch(const std::exception& e)
{
m_logger->error("on_request_broadcast_groups fail to parse {} with error {}", msg.dump(), e.what());
return;
}
auto cur_shared_rpc_msg = std::make_shared<std::string>(std::move(msg_detail));
for(const auto& group: groups)
{
auto temp_iter = m_clients_for_group.find(group);
if(temp_iter == m_clients_for_group.end())
{
continue;;
}
for(const auto& one_entity_pair: temp_iter->second)
{
m_router->push_msg({}, one_entity_pair.second, cur_shared_rpc_msg, enums::packet_cmd_helper::encode(enums::packet_cmd::game_to_client, msg_cmd));
}
}
}
space_server上提供了广播消息的入口:
void space_server::send_msg_to_broadcast_groups(const std::vector<std::string>& groups, std::shared_ptr<const std::string>& rpc_msg)
{
json params;
params["groups"] = groups;
params["msg_detail"] = *rpc_msg;
params["msg_cmd"] = std::uint8_t(enums::entity_packet::json_rpc);
json msg;
msg["cmd"] = "request_broadcast_groups";
msg["param"] = params;
auto cur_shared_msg = std::make_shared<std::string>(msg.dump());
for(const auto& one_gate_info: m_gate_entities)
{
m_router->push_msg(one_gate_info.first, m_local_name_ptr, {}, std::make_shared<std::string>(msg.dump()), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
}
}
而调用这个接口的目前只有专门处理广播的notify_manager:
void notify_manager::notify_broadcast_groups(const std::vector<std::string>& groups, const utility::rpc_msg& msg)
{
auto cur_cmd = msg.cmd;
std::shared_ptr<const std::string> shared_msg = msg.to_bytes();
utility::rpc_msg stat_msg;
stat_msg.cmd = "add_broadcast_group_stat";
stat_msg.set_args(groups, cur_cmd, std::uint32_t(shared_msg->size()));
m_server->call_service("notify_service", stat_msg);
m_server->send_msg_to_broadcast_groups(groups, shared_msg);
}
目前的mosaic_game里只有全服聊天和门派聊天使用了notify_manager提供的这个接口来做广播:
void player_notify_component::send_msg_to_broadcast_groups(const std::vector<std::string>& groups, const utility::rpc_msg& msg)
{
server::notify_manager::instance().notify_broadcast_groups(groups, msg);
}
void player_notify_component::send_msg_to_broadcast_group(const std::string& group, const utility::rpc_msg& msg)
{
m_player->prop_proxy().notify().broadcast_group_send_ts().insert(group, utility::timer_manager::now_ts());
std::vector<std::string> cur_groups;
cur_groups.push_back(group);
server::notify_manager::instance().notify_broadcast_groups(cur_groups, msg);
}
虽然广播给所有客户端占在线广播消息的绝大部分,但是还有一些情况广播数据并不传递到客户端,而是通知到player_entity这一层去执行一些逻辑,此时的数据传递就不能用前述的gate广播组来做。不过这里我们也可以使用类似的设计,将数据发送到所有的game_server,然后由game_server去遍历当前进程上的所有玩家执行广播消息通知。这部分的功能通过notify_service来提供:
void notify_service::notify_all_online(const utility::rpc_msg& msg, const std::string& cmd, const json::object_t& detail)
{
db_logic::notify_msg new_msg;
new_msg.msg.cmd = cmd;
new_msg.msg.args = args;
new_msg.doc_seq = m_online_msgs.size();
m_online_msgs.push_back(new_msg);
if(m_delay_broadcast_timer.valid())
{
return;
}
utility::rpc_msg cur_broadcast_msg;
cur_broadcast_msg.cmd = "sync_add_online_msg";
cur_broadcast_msg.set_args(new_msg);
get_server()->call_server_multi(cur_broadcast_msg, m_broadcast_managers);
}
这个接口会将广播数据通知到所有注册过来的notify_manager上,由于在每个space_server上都会创建一个notify_manager,所以等价于把这个消息广播到了space_server。当space_server上的notify_manager接收到这个请求之后,会遍历当前进程上的所有玩家来通知去查收最新的全服广播消息:
void notify_manager::sync_add_online_msg(const utility::rpc_msg& data, const json& new_online_msg)
{
m_logger->info("sync_add_online_msg {}", new_online_msg.dump());
db_logic::notify_msg temp_msg;
try
{
new_online_msg.get_to(temp_msg);
}
catch(const std::exception& e)
{
m_logger->error("sync_add_online_msg fail for msg {}", new_online_msg.dump());
return;
}
m_online_msgs.push_back(temp_msg);
auto cur_players = entity::entity_manager::instance().get_entities_by_exact_type<
entity::player_entity>();
for(auto one_player: cur_players)
{
one_player->dispatcher().dispatch(enums::event_category::notify, std::string("online"));
}
}
这里的获取所有在线玩家的实现是一个可以优化的点,内部会使用dynamic_cast来将server_entity转换到player_entity,如果这个有比较明显的性能瓶颈的话,推荐在notify_manager上维护好一个单独的在线player_entity的集合,这样就可以避免每次都执行这个get_entities操作。
玩家身上的player_notify_component注册了这个广播数据的接收,并重定向到rpc的处理:
void player_notify_component::event_listener(const utility::enum_type_value_pair& ev_cat, const std::string& detail)
{
if(ev_cat != utility::enum_type_value_pair(enums::event_category::notify))
{
return;
}
m_owner->logger()->info("player_notify_component event_listener event {} value {} detail {}", ev_cat.enum_type, ev_cat.enum_value, detail);
if(detail == "all")
{
handle_online_msgs();
handle_db_msgs();
}
else if(detail == "online")
{
handle_online_msgs();
}
else if(detail == "db")
{
handle_db_msgs();
}
}
void player_notify_component::handle_online_msgs()
{
const auto& online_msgs = server::notify_manager::instance().online_msgs();
if(online_msgs.size() <= m_player->prop_data().notify().online_seq_read())
{
return;
}
for(auto i = m_player->prop_data().notify().online_seq_read(); i< online_msgs.size(); i++)
{
on_new_notify(online_msgs[i], true);
}
}
注意到这里我们还给这个消息添加了一个唯一递增序列号,玩家自身也记录一个不存库的属性来表明自身已经读取到哪一个在线信息了,玩家每处理一个在线信息就对这个属性进行更新:
void player_notify_component::on_new_notify(const db_logic::notify_msg& new_msg, bool is_online)
{
m_owner->logger()->info("on_new_notify {}", json(new_msg).dump());
if(is_online)
{
if(m_player->prop_proxy().notify().online_seq_read().get() < new_msg.doc_seq)
{
m_player->prop_proxy().notify().online_seq_read().set(new_msg.doc_seq);
m_player->on_rpc_msg(new_msg.msg);
}
else
{
m_owner->logger()->error("duplicated on_new_notify {}", json(new_msg).dump());
}
}
else
{
if(m_player->prop_proxy().notify().db_seq_read().get() < new_msg.doc_seq)
{
m_player->prop_proxy().notify().db_seq_read().set(new_msg.doc_seq);
m_player->on_rpc_msg(new_msg.msg);
}
else
{
m_owner->logger()->error("duplicated on_new_notify {}", json(new_msg).dump());
}
}
}
这里的序列号主要是为了避免广播消息在同一个player_entity上重复执行,下面就是一种可能出现重复处理消息通知的时间线
T1 notify_service::notify_all_online msg_1
T2 space_server_1::notify_manager::sync_add_online_msg msg_1
T3 space_server_1::player_A::on_new_notify msg_1
T4 space_server_1::player_A::migrate_out
T5 space_server_2::player_A::migrate_in
T6 space_server_2::notify_manager::sync_add_online_msg msg_1
player_A在space_server_1处理了一条广播消息msg_1之后发生了迁移,迁移到了space_server_2之后,这个msg_1才传递到space_server_2::notify_manager上,此时如果不判断消息的序列号的话,就会出现消息的重复通知。
除了重复通知之外,还可能出现消息的通知丢失问题,主要出现在玩家的迁移过程中。此时玩家的entity被销毁,其数据正在网络中中转,任意一个space_server都没有其对应的entity,因此消息会通知不到。为了解决这个问题,玩家在迁移结束之后会重新拉取一下notify_manager里存储的消息,判断是否需要处理:
bool player_notify_component::init(const json& data)
{
m_player = dynamic_cast<player_entity*>(m_owner);
if(!m_player)
{
return false;
}
m_owner->dispatcher().add_listener(enums::event_category::notify, &player_notify_component::event_listener, this);
m_player->login_dispatcher().add_listener(&player_notify_component::on_login, this);
m_player->logout_dispatcher().add_listener(&player_notify_component::on_logout, this);
// 下面这行负责处理迁移完成之后的消息拉取
m_owner->migrate_in_finish_dispatcher().add_listener(
&player_notify_component::on_migrate_in_finish, this);
return true;
}
void player_notify_component::on_migrate_in_finish()
{
handle_db_msgs();
handle_online_msgs();
}
离线消息投递
本章前面的内容介绍的都是如何向一个或多个在线的客户端发送消息,其核心在于在线,如果玩家不在线或者中途断线再上线,这些消息他就接收不到。所以这类在线消息通知只能用于一些提示性的业务,消息的接收与否不能影响服务端的逻辑正确性。如果我们需要向一个或多个玩家发送一些保证能接收到的消息,前述的机制就无法使用了,必须引入一种依赖于数据库的消息接收确认机制,来保证这个消息在玩家上线后能够及时的处理。
单播消息
跟在线消息投递一样,离线单播消息也是离线消息投递的基础。确保消息被接收我们可以仿照TCP的ACK机制,每个玩家的离线消息地址当作一个先进先出的队列,这个地址里接收到的数据会按照到达序分配一个递增流水号,同时将这条数据进行存库。然后玩家自己身在线后定期从离线消息队列中拉取头部的若干个消息进行处理,并删除已经处理完成的数据。
在mosaic_game中也的确是这样实现的,在数据库中创建了一个单独的表OfflineMsg来存储所有玩家的离线消息通知,这个库里的每条消息都有一个entity_id字段代表对应的玩家id,同时有一个doc_seq字段代表这条消息的唯一序列号:
"OfflineMsg": [
[
[["entity_id", 1]],
{
}
],
[
[["entity_id", 1], ["doc_seq", 1]],
{
"unique": true
}
]
]
这个数据库的操作都被封装到了entity_db_msg_manager_base中,在cpp中定义了消息的完整格式:
struct entity_db_msg
{
std::string entity_id;
json::object_t detail;
std::uint64_t doc_seq;
std::uint64_t ts;
NLOHMANN_DEFINE_TYPE_INTRUSIVE(entity_db_msg, entity_id, detail, doc_seq, ts)
};
对于每个entity_id, 其doc_seq为0的行会作为一个元数据行,里面有一个额外的字段used_seq代表这个玩家的离线消息里使用的最大流水号,每次向这个玩家发送离线消息的时候,需要先查询这条doc_seq==0的数据里记录的used_seq:
void entity_db_msg_manager_base::add_msg(const std::string& entity_id,const json::object_t& detail)
{
std::shared_ptr<entity_db_msg> cur_entity_db_msg = std::make_shared<entity_db_msg>();
cur_entity_db_msg->entity_id = entity_id;
cur_entity_db_msg->detail = detail;
cur_entity_db_msg->doc_seq = 0;
cur_entity_db_msg->ts = utility::timer_manager::now_ts();
auto cur_db_callback = [cur_entity_db_msg, this](const json& db_reply)
{
on_query_seq_back(cur_entity_db_msg, db_reply);
};
tasks::db_task_desc::base_task cur_task_base(tasks::db_task_desc::task_op::modify_update, std::string{}, "", collection_name());
json query_doc, db_doc;
query_doc["entity_id"] = meta_doc_id(entity_id);
query_doc["doc_seq"] = 0;
db_doc["$inc"]["used_seq"] = 1;
auto cur_modify_task = tasks::db_task_desc::modify_task::modify_one(cur_task_base, query_doc, db_doc, true, true);
run_db_task(cur_modify_task, cur_db_callback);
}
由于这个玩家的doc_seq==0的行可能还没有创建,所以查询的时候使用的接口是modify_one,代表如果没有的话就以默认值来创建。在这个db操作的回调on_query_seq_back,使用最新递增之后作为这条消息的序列号,最后才能插入到数据库中:
cur_entity_db_msg->doc_seq = used_seq;
auto cur_db_callbak = [cur_entity_db_msg, this](const json& db_reply)
{
on_add_msg_back(cur_entity_db_msg, db_reply);
};
tasks::db_task_desc::base_task cur_task_base(tasks::db_task_desc::task_op::insert_one, std::string{}, "", collection_name());
json db_doc = *cur_entity_db_msg;
auto cur_insert_task = tasks::db_task_desc::insert_task::insert_one(cur_task_base, db_doc);
run_db_task(cur_insert_task, cur_db_callbak);
插入完成之后,还需要执行一个通知操作,这样如果这个玩家在线的话就会去立即拉取数据库中存好的通知信息:
void entity_db_msg_manager_base::on_add_msg_back(std::shared_ptr<entity_db_msg> cur_entity_db_msg, const json& db_reply)
{
m_logger->info("on_add_msg_back for msg {} db_reply {}", json(*cur_entity_db_msg).dump(), db_reply.dump());
notify_pull_msg(cur_entity_db_msg);
}
void offline_msg_manager::notify_pull_msg(std::shared_ptr<db_logic::entity_db_msg> cur_msg)
{
utility::rpc_msg notify_msg;
notify_msg.cmd = "request_call_online";
std::vector<json> notify_cmd_args;
notify_cmd_args.push_back(cur_msg->doc_seq);
notify_msg.set_args(cur_msg->entity_id, std::string("notify_pull_offline_msg"), notify_cmd_args);
m_server->call_service("login_service", notify_msg);
}
这里由于不知道这个玩家的在线call_proxy,所以只能委托到login_service去执行在线通知,所以login_service的压力还是比较大的。
注意到这个通知消息并没有将当前的entity_db_msg打包过去,只打包了序列号,这是因为要保证在OfflineMsg中一个entity_id对应的数据要严格按照序列号来处理。在我们目前的设计中,一次add_msg会触发两次数据库写入操作,然后再加上一次login_service->relay_entity->player_entity的双层转发操作,这样的多次异步过程在多进程结构中是无法保证player_entity接收到的消息是按序到达的。举个例子来说space_server_1与space_server_2对同一个entity_id执行add_msg操作,space_server_1的on_query_seq_back的返回值里序列号递增为了2,而 space_server_2的on_query_seq_back的返回值里序列号递增为了3。然后在后续的多个网络发送中,可能会出现3这个notify_pull_offline_msg请求优先到达player_entity上,如果此时立即把带过来的3对应的通知消息处理的话,就会导致之前设定的严格按照递增序处理离线消息这个规则被违反。
当player_entity接收到这个只带序号的消息通知之后,会立即再拉取OfflineMsg数据库中的未处理数据:
void player_offline_msg_component::notify_pull_offline_msg(const utility::rpc_msg& msg, std::uint64_t cur_msg_seq)
{
auto temp_iter = m_done_msg_seqs.find(cur_msg_seq);
if(temp_iter != m_done_msg_seqs.end())
{
// 这条消息之前已经处理过了
return;
}
m_remain_msg_seqs.insert(cur_msg_seq);
if(!m_is_pulling_msg)
{
pull_msg_impl();
}
}
void player_offline_msg_component::pull_msg_impl()
{
m_owner->logger()->info("entity {} pull_msg_impl ", m_owner->entity_id());
m_is_pulling_msg = true;
std::function<void(const std::string&, const std::vector<db_logic::entity_db_msg>& )> pre_cb = std::bind(&player_offline_msg_component::handle_new_offline_msgs, this, std::placeholders::_1, std::placeholders::_2);
server::offline_msg_manager::instance().pull_msg(m_owner->entity_id(), 10, m_owner->convert_callback(pre_cb));
}
当拉取到新的未处理数据之后,按照顺序处理拉取的数据,处理完成之后删除已处理的,然后再执行一次拉取:
std::vector<std::uint64_t> temp_msg_seqs;
for(const auto& one_msg: result_msgs)
{
if(m_done_msg_seqs.find(one_msg.doc_seq) != m_done_msg_seqs.end())
{
// 出现数据已经处理但是db还未完全删除 但是新的一次pull 又把数据拉出来的情况
temp_msg_seqs.push_back(one_msg.doc_seq);
continue;
}
m_done_msg_seqs.insert(one_msg.doc_seq);
m_remain_msg_seqs.erase(one_msg.doc_seq);
temp_msg_seqs.push_back(one_msg.doc_seq);
utility::rpc_msg cur_rpc;
try
{
one_msg.detail.at("cmd").get_to(cur_rpc.cmd);
one_msg.detail.at("args").get_to(cur_rpc.args);
}
catch(const std::exception& e)
{
m_owner->logger()->error("fail to decode offline msg {}", json(one_msg.detail).dump());
continue;
}
m_owner->rpc_owner_on_rpc(cur_rpc);
}
server::offline_msg_manager::instance().del_msg(m_owner->entity_id(), temp_msg_seqs);
pull_msg_impl();
当某次拉取数据得到的是空数据之后,才停止拉取。但是这里又有一个异步导致的问题,相关函数调用的时机如下所示:
T1: notify_pull_offline_msg 2 -> pull_msg_impl
T2: handle_new_offline_msgs [2] ->pull_msg_impl
T3: notify_pull_offline_msg 3
T4: handle_new_offline_msgs []
T1时刻玩家收到了序列号2的通知,此时发起了一次未读队列拉取请求pull_msg_impl,T2时刻处理完序列号2的数据之后再发起了一次未读队列拉取请求,T3时刻又有一个新的序列号3过来。但是这个未读队列在T2拉取时并没有读取到3的数据,导致T4时刻返回了空集合,判定为数据处理结束,这样就导致数据3停留在数据库之中,直到下次pull_msg_impl被执行。所以这里会先用一个std::unordered_set<std::uint64_t> m_remain_msg_seqs;来存储所有接收到的未处理消息序列号,只有当pull_msg_impl的结果为空且m_remain_msg_seqs才停止拉取,如果只是pull_msg_impl则开启一个短间隔的计时器去延迟拉取:
if(result_msgs.empty())
{
if(m_remain_msg_seqs.empty())
{
m_is_pulling_msg = false;
return;
}
else
{
m_owner->add_timer_with_gap(std::chrono::seconds(1), [this]()
{
pull_msg_impl();
});
return;
}
}
上面介绍的是消息不需要保存历史记录时的处理机制,如果消息被处理之后不能立即删除,则玩家身上需要记录一个存库的字段,来表明当前已经处理的离线数据的最大流水号,查询的时候需要使用这个最大已读流水号去过滤。同时还需要一个存库的字段去记录当前最大未读流水号,这样能够更快的知道还剩多少消息未处理。这种带历史记录的离线保序消息处理的细节可以参考邮件系统里player_email_component里的相关逻辑。
多播消息
多播消息的在线投递机制非常简单,就是对单播消息的在线投递的一个循环调用。但是多播消息的离线投递机制则无法直接复用单播消息的离线投递机制,因为这个消息不能被单人处理后直接从数据库中删除。所以每个人需要对这个多播消息做一个最大已处理序号的记录,各自拉取消息的时候需要加入已读最大值作为查询条件。这个多播消息的典型应用场景就是群组聊天的通知,由于这部分的内容已经被聊天和群组覆盖了,实现细节上与邮件系统大同小异,所以这里就不再详细介绍。
广播消息
广播消息的离线推送实现上基本照抄自广播消息的在线player_entity推送,相关rpc流程基本是类似的,只不过在notify_service发出广播之前,需要先将数据存库,每一个消息都有一条数据库记录:
void notify_service::notify_all_db(const utility::rpc_msg& msg, const std::string& cmd, const json::array_t& args)
{
db_logic::notify_msg new_msg;
new_msg.msg.cmd = cmd;
new_msg.msg.args = args;
new_msg.doc_seq = 0;
if(m_next_db_seq == 0)
{
m_db_msgs.push_back(new_msg);
return;
}
new_msg.doc_seq = m_next_db_seq;
m_next_db_seq++;
m_db_msgs.push_back(new_msg);
save_db_seq();
tasks::db_task_desc::base_task cur_task_base(tasks::db_task_desc::task_op::insert_one, std::string{}, "", m_collection_name);
json db_doc = new_msg;
auto cur_db_callback = [this](const json& db_reply)
{
on_insert_seq_back(db_reply);
};
auto cur_insert_task = tasks::db_task_desc::insert_task::insert_one(cur_task_base, db_doc);
get_server()->call_db(cur_insert_task->to_json(), cur_db_callback);
if(m_delay_broadcast_timer.valid())
{
return;
}
utility::rpc_msg cur_broadcast_msg;
cur_broadcast_msg.cmd = "sync_add_db_msg";
cur_broadcast_msg.set_args(new_msg);
get_server()->call_server_multi(cur_broadcast_msg, m_broadcast_managers);
}
值得注意的是这里的m_db_msgs数组中并不会存储所有的的离线消息,而只存储本次服务器启动之后加入的离线消息。所以玩家在获取指定序列号的离线消息的时候,这条消息可能并不在notify_manager的m_db_msgs数组中,需要去数据库里拉取这些数据:
void player_notify_component::handle_db_msgs()
{
if(m_player->prop_data().notify().db_seq_read() + 1 >= server::notify_manager::instance().next_db_seq())
{
return;
}
std::uint64_t cached_db_begin = server::notify_manager::instance().next_db_seq();
const auto& cached_msgs = server::notify_manager::instance().db_msgs();
if(!cached_msgs.empty())
{
cached_db_begin = cached_msgs.front().doc_seq;
}
m_owner->logger()->debug("handle_db_msgs self seq {} cached_db_begin {}", m_player->prop_data().notify().db_seq_read(), cached_db_begin);
if(m_player->prop_data().notify().db_seq_read() + 1 < cached_db_begin)
{
// pull from db
std::uint64_t pull_seq_end = cached_db_begin - 1;
std::uint64_t pull_seq_begin = m_player->prop_data().notify().db_seq_read() + 1;
if(pull_seq_end - pull_seq_begin > 10)
{
pull_seq_end = pull_seq_begin + 10;
}
pull_unread_db_msgs(pull_seq_begin, pull_seq_end);
}
else
{
// pull from memory
if(cached_msgs.empty())
{
return;
}
for(auto i = m_player->prop_data().notify().db_seq_read() - cached_msgs.front().doc_seq + 1; i< cached_msgs.size(); i++)
{
on_new_notify(cached_msgs[i], false);
}
}
}
notify().db_seq_read()里存储的是最大离线消息已处理序列号,这个属性是要存库的,而在线消息notify().online_seq_read()则不存库。
handle_db_msgs在被触发的时候会检查下一个要处理的数据是否在notify_manager中,不在的话拉取后续的10条数据来渐进处理:
void player_notify_component::pull_unread_db_msgs(std::uint64_t seq_begin, std::uint64_t seq_end)
{
const std::string notify_db_name = "NotifyMsg";
auto cur_reply_cb = m_owner->convert_callback(m_owner->add_callback([=](const json& db_result)
{
this->pull_unread_db_msgs_cb(seq_begin, seq_end, db_result);
}));
tasks::db_task_desc::base_task cur_task_base(tasks::db_task_desc::task_op::find_multi, std::string{}, "", notify_db_name);
json query_filter, sort;
query_filter["doc_seq"]["$lte"] = seq_end;
query_filter["doc_seq"]["$gte"] = seq_begin;
sort["doc_seq"] = 1;
auto cur_find_task = tasks::db_task_desc::find_task::find_multi(cur_task_base, query_filter, seq_end - seq_begin + 1, {}, 0, tasks::db_task_desc::read_prefer_mode::secondary, sort );
m_owner->call_db(cur_find_task->to_json(), cur_reply_cb);
}
在数据查询回来之后再使用之前在线消息分发使用的on_new_notify来分发消息,这个共享接口里通过一个bool值来区分是在线消息和离线消息。处理完一个批次之后会再使用handle_db_msgs来检查是否还有后续消息要处理:
void player_notify_component::pull_unread_db_msgs_cb(std::uint64_t seq_begin, std::uint64_t seq_end, const json& db_reply)
{
std::vector<db_logic::notify_msg> result_docs;
std::string error;
tasks::db_task_desc::task_reply cur_reply;
// 省略反序列化相关代码
for(const auto& one_msg: result_docs)
{
on_new_notify(one_msg, false);
}
if(seq_end > m_player->prop_data().notify().db_seq_read())
{
m_player->prop_proxy().notify().db_seq_read().set(seq_end);
}
handle_db_msgs();
}
游戏中的寻路系统
在前面的章节中我们已经讨论了如何在位置变化时做移动同步。对于由客户端操控的entity来说,其位置更新主要是由客户端的输入来驱动的。而对于服务端管理的NPC等类型的entity来说,则是由AI发起的寻路任务驱动的。这个寻路任务主要包括三种:
- 移动到特定点的指定半径内
- 趋近特定可移动
entity的指定半径内 - 跟随特定可移动
entity,维持与这个entity的距离在一定半径内
跟随移动可以通过不断的调用趋近移动来完成,而趋近移动则可以通过不断的调用指定点的移动来完成,所以寻路任务可以简化为如何一个entity从A点移动到B点,这个任务的核心问题就是从地图上找出两点之间的一条可移动路径。在不同的游戏世界中,不同的entity的可移动区域可能各有规定,例如在同一个3D空间,有些entity只能在地面行走,有些entity有攀爬功能可以在岩壁上移动,甚至有些entity可以在空中飞行水下潜行。针对不同的移动能力,我们需要对游戏场景采用不同的结构来表示可移动区域,并使用不同的寻路算法来计算两点之间的可移动路径。下面我们来针对游戏中常见的地图类型来介绍其可移动区域的结构建模以及路径查询。
基于图的寻路
游戏地图中最简单的用来描述可移动区域的结构就是图。在图论中,一个图G的数学定义为G=(V, E),其中V为图G中的节点集合,而E为图G中的边集合,每条边都由(v1, v2)这个二元组来表示,v1,v2都属于节点集合V。如果E中不区分二元组内两个元素的顺序,即(v1, v2)等价于(v2, v1),则这个图称之为无向图,反之称之为有向图。下图中就是上海的地铁线路图的一部分,他也可以转换为一个无向图:

如果存在一个有序节点列表v[0],....,v[n],将其内部所有元素与后面一个元素相连接构造出来的(v[0], v[1]),..., (v[n-1],v[n])所代表的n条边都在图G的边集合E中,则定义v[0], v[n]两者之间是连通的,(v[0], v[1]),..., (v[n-1],v[n])是这两个节点之间的连通路径。如果图中任意两点都是连通的,那么图被称作连通图。如果此图是有向图,则称为强连通图。由于无向图可以通过将一条边(v1, v2)转换为两条顺序相反的有向边的方法来构造有向图,所以后文中我们只讨论有向图。
我们前面提出的两点之间的可移动路径查询问题现在可以标准化为一个数学问题:在给定的图G=(V, E)中,从集合V中选取两个不同的点a, b,计算出a->b的连通路径。这个问题可以认为是一个已经解决的数学问题,在算法导论中提供了两种算法来解决这个问题,宽度优先搜索和深度优先搜索,分别简称为BFS(breadth-first-search)和DFS(depth-first-search):
- 宽度优先搜索时,我们构建一个队列
Q,一个集合O以及一个映射P,初始时将a压入Q,同时O.add(a)。然后不断的对Q进行pop操作获取元素c,将由c作为开始节点的所有边的终点d放入到Q中。往Q中push任意一个元素d时都需要检查O中是否已经包含d, 如果包含的则不处理这个元素;如果不包含才能执行push,同时此时记录P[d]=c。当遇到d==b时,则认为找到一条路径L,不断的执行e=P[d],L.add((e, d)),d=e这个迭代,直到d==a,此时reverse(L)就是所需要寻找的一条从a到b的连通路径。当Q为空时,则代表a到b的连通路径不存在。

- 深度优先搜索时,我们类似的构造一个集合
O以及一个映射P,同时定义一个函数dfs(c),这个函数接受一个节点c作为参数,函数体则对由c作为开始节点的所有边的终点d执行if !O.exist(d) then O.add(d),P[d]=c,dfs(d)操作。然后我们调用dfs(a)来触发递归调用,当遇到d==b时,则认为找到一条路径L,不断的执行e=P[d],L.add((e, d)),d=e这个迭代,直到d==a,此时reverse(L)就是所需要寻找的一条从a到b的连通路径。如果dfs(a)完成调用后仍然没有找到一条连通路径,则代表a到b的连通路径不存在。

上面两个路径搜索方法的最坏情况下都需要遍历所有的边,例如图中的所有点构成一个圆环,所以其最坏时间复杂度都是边数量的常数倍。其一般情况下的复杂度也没有优劣之分,不过实际上采用的都是宽度优先搜索,因为深度优先搜索会使用递归,递归带来的性能影响在函数体比较小的时候非常明显,使用栈去模拟递归又要写很多额外代码。
如果a,b两点之间并不连通,则不管是BFS还是DFS都会搜索所有可以由a到达的节点集合S,遍历整个集合S之后才能判定两者无法连通。例如搜索从杭州到三亚的高铁换乘路线会穷尽国内基本所有的高铁站。此时的搜索效率是非常低的,为了最终判定两者不连通会浪费很多时间。所以在游戏实际使用的地图中,一般还会有一个数据结构来辅助查询a,b两点之间是否连通,路径搜索时如果发现这两个点压根就不连通的话就不再执行后续的BFS或DFS过程。直接在离线情况下预处理地图生成unordered_map<int, unordered_set<int>> connected这样的连通性矩阵耗时耗力,而且其存储空间也要求很多,我们需要一个更优秀的结构去处理连通性问题。
对于无向图来说,这个连通查询结构很好做,只需要在每个节点上增加一个字段region_id,所有region_id相同的节点对a,b都是可以连通的,region_id不同的节点对a,b都是不连通的。下面的代码就负责预处理整个地图给所有的节点都赋值相应的region_id:
struct vertex
{
int id;
int region_id = 0;
unordered_set<int> edges;
};
void set_region_id_for_vertexes(unordered_map<int, vertex>& all_vertexes)
{
int region_id_counter = 0;
for(auto& [one_id,one_vertex]: all_vertexes)
{
if(one_vertex.region_id != 0)
{
continue;
}
region_id_counter++;
queue<int> bfs_queue;
bfs_queue.push(one_id);
while(!bfs_queue.empty())
{
auto temp_id = bfs_queue.front();
bfs_queue.pop();
auto& cur_bfs_vertex = all_vertexes[temp_id];
if(cur_bfs_vertex.region_id != 0)
{
continue;
}
cur_bfs_vertex.region_id = region_id_counter;
for(const auto& one_edge: cur_bfs_vertex.edges)
{
bfs_queue.push(one_edge);
}
}
}
}
由于每个节点只会设置region_id一次,每条边也只会遍历到一次,所以整体的消耗时间等价于边的数量加上节点的数量,是线性复杂度,非常高效。
而对于有向图来说,情况复杂了起来,只用region_id是不够的,我们不能因为a->b有一条路径就将这两个点的region_id设置为同一个值,因为可能b->a并没有可行路径。有用同一个region_id的集合S内的任意两点a,b都需要能找到a->b的路径和b->a的路径,即S是图G中的一个强连通子图。我们从G中以下面的方法构造一个新的图G2:遍历G中的每个强连通子图S,删除原来两个点都在S内的边,将整个子图S里的点都替换为同一个新的点v2,对应的更新边的集合E进行节点替换以及重复边的去重。这样我们就得到了一个节点数量和边数量都精简很多的新的有向无环图DAG(Directed Acyclic Graph)G2`。
TODO 换图 自己画一个

判断原来图G中两个点a,b是否可连通只需要去判断a所在的强连通子图Sa在G2中的点Va与b所在的强连通子图Sb在G2中的点Vb是否连通。由于G2的规模相对G来说小了很多,其连通性判定会节约很多时间,同时对于内存的要求也相对于前述的连通性矩阵降低了很多。现在遗留下来的问题就是我们如何将图G切分为多个强连通分量。
在有向图G中寻找其内部所有强连通分量这个问题已经有现成的算法,叫Tarjan算法。在介绍该算法之前,我们需要先回顾一下前面介绍的DFS, 这里引入一个DFS生成树的结构,来记录DFS时每个节点的前序节点。我们以下面的有向图为例:

有向图的 DFS生成树主要有4种边(不一定全部出现):
- 树边
(tree edge):绿色边,每次搜索找到一个还没有访问过的结点的时候就形成了一条树边。 - 反祖边
(back edge):黄色边,也被叫做回边,即指向祖先结点的边。 - 横叉边
(cross edge):红色边,它主要是在搜索的时候遇到了一个已经访问过的结点,但是这个结点并不是当前结点的祖先时形成的。 - 前向边
(forward edge):蓝色边,它是在搜索的时候遇到子树中的结点的时候形成的。
有了上述定义之后,我们可以利用DFS来计算图的所有强连通分量:如果结点u是某个强连通分量在搜索树中遇到的第一个结点,那么这个强连通分量的其余结点肯定是在搜索树中以u为根的子树中。u被称为这个强连通分量的根。为了证明这个性质我们采取反证法:假设有个结点v在该强连通分量中但是不在以u为根的子树中,那么u到v的路径中肯定有一条离开子树的边。但是这样的边只可能是横叉边或者反祖边,然而这两条边都要求指向的结点已经被访问过了,这就和u是第一个访问的结点矛盾了。
在Tarjan算法中为每个结点u维护了以下几个变量:
dfn[u]:深度优先搜索遍历时结点u被搜索的次序。low[u]:设以u为根的子树为Subtree(u)。low[u]定义为以下两类结点的dfn的最小值:Subtree(u)中的结点;- 从
Subtree(u)通过一条不在搜索树上的边能到达的结点。
一个结点的子树内结点的dfn都大于该结点的dfn。从根开始的一条路径上的dfn严格递增,low严格非降。按照深度优先搜索算法搜索的次序对图中所有的结点进行搜索。在搜索过程中,对于结点u和与其相邻的且不是u的父节点的结点v考虑3种情况:
v未被访问:继续对v进行深度搜索。在回溯过程中,用low[v]更新low[u]。因为存在从u到v的直接路径,所以v能够回溯到的已经在栈中的结点,这里就包括u。v被访问过,已经在栈中:即已经被访问过,根据low值的定义(能够回溯到的最早的已经在栈中的结点),则用dfn[v]更新low[u]。v被访问过,已不在在栈中:说明v已搜索完毕,其所在连通分量已被处理,所以不用对其做操作。 下面的就是上述算法的伪代码:
TARJAN_SEARCH(int u)
vis[u]=true
low[u]=dfn[u]=++dfncnt // 为节点u设定次序编号和Low初值
push u to the stack // 将节点u压入栈中
for each (u,v) then do // 枚举每一条边
if v hasn't been search then // 如果节点v未被访问过
TARJAN_SEARCH(v) // 继续向下搜索
low[u]=min(low[u],low[v]) // 回溯
else if v has been in the stack then // 如果节点u还在栈内
low[u]=min(low[u],dfn[v])
if (DFN[u] == Low[u]) // 如果节点u是强连通分量的根
repeat v = S.pop // 将v退栈,为该强连通分量中一个顶点
print v
until (u== v)
对于一个连通分量图,我们很容易想到,在该连通图中有且仅有一个dfn[u]==low[u]。该结点一定是在深度遍历的过程中,该连通分量中第一个被访问过的结点,因为它的dfn值和low值最小,不会被该连通分量中的其他结点所影响。因此,在回溯的过程中,判定dfn[u]==low[u]的条件是否成立,如果成立,则栈中从u后面的结点构成一个强连通分量。整个算法需要执行两次DFS,所需时间为图G的大小的线性时间。
前述内容解决了如何在有向图中寻找a,b两点的一条连通路径的问题,但是寻路问题并不是简单的获取一条连通路径,因为在连通路径有多条的情况下,我们更希望获取一条消耗最低的路径。

为了定义路径的消耗,我们需要对图G中的每条边增加一个cost,代表路过这条边时需要增加的代价,而一条路径的总cost则是路径中所有边的代价之和。
在给定的无负的cost的有向图G中寻找给定两个节点a,b的最短路径是一个已经解决的问题,使用的是经典的Dijkstra单源最短路径算法。Dijkstra算法负责计算一个点到图内其他点的最短距离,主要内容是维护两个集合,即已确定最短路径的结点集合CloseSet(A)、这些结点向外扩散的邻居结点集合OpenSet(B),同时还维护两个数组:
- 一个距离数组
dis来记录每个节点到起点a的距离,初始时都是负无穷大,同时dis(a)初始化为0。 - 一个前序数组
pre来记录每个节点到起点a的最短路径中的最后一条边的起点。
在初始化上述变量之后,程序逻辑如下:
- 把起点
a放到A中,把每条以a为起点的边的终点v放到B中,更新dis(v) = cost(a,v)。 - 从
B中找出dis(u)最小的节点u,放到A中。 - 把每条以
u为起点的边的终点v进行如下处理:- 如果
v已经在集合A中则不做处理, - 如果
v不在集合A中,则加入到集合B,同时计算temp_cost= dis(u)+cost(u,v)。如果这个temp_cost小于已经记录的dis(v)则更新dis(v)=temp_cost同时更新pre[v]=u。
- 如果
- 重复步骤
2和步骤3,直到B为空
上面的算法流程就是dijkstra算法的完成流程,朴素的dijkstra实现在步骤2中直接遍历所有元素来获取最小值,此时算法的时间复杂度为|V|*|V|, 采取了斐波那契堆优化之后,最优复杂度可以降低到|E| + |V|log(|V|)。

由于我们目前只需要计算a,b两点之间的最短路径,所以在步骤2中发现u==b时就可以提前结束算法迭代。算法结束时如果B为空代表a,b不连通,如果u==b代表a,b连通,此时构造结果路径列表L,执行do L.push(u), u=pre[u] while u!=a,执行结束之后reverse(L)就是我们所需要的一条a->b的最短路径。最坏情况下b点恰好是离a最远的一个点,此时计算a,b之间的最短路径需要跑完完整的dijkstra算法,因此计算在图G中计算a,b两点之间的最短路径算法的时间复杂度与dijkstra是一样的。
基于网格的二维平面寻路
如果游戏场景只有x,y两个坐标轴的话,这个场景就是一个二维平面。针对二维平面最简单的可行走区域表示结构为二维网格grid,内部存储了网格内每个基本单元cell存储一个bool值,代表该cell是否可以通行:
struct grid
{
vector<vector<bool>> cells;
}
将整个二维平面转换为网格的方法也很简单,选取一定粒度的正方形作为cell的形状,然后以这个cell作为单位粒度对原始的场景坐标轴进行变换。如果变换后由(x,y)->(x+1,y)->(x+1,y+1)->(x,y+1)这四个点组成的正方形内有不可同行的区域,则cells[x][y]设置为false,反之设置为true。从x,y出发可以直接移动到达的其他节点根据不同的设定有不同的定义:
- 只允许往上下左右四个方向移动,即以
x,y作为起点的边最多四条,其终点集合为(x+1,y),(x-1,y),(x,y-1),(x,y+1)这四个cell - 允许往上下左右左上左下右上右下八个方向移动,即以
x,y作为起点的边最多八条,其终点集合为(x+1,y+1),(x+1,y),(x+1,y-1),(x,y-1),(x,y+1),(x-1,y+1),(x-1,y),(x-1,y-1)这八个cell
整个网格地图可以很方便的转换为前述的无向图结构G=(V, E),这里的所有的cells[x][y]==true的x,y坐标构成了当前场景的无向图G中的V节点集合,然后每个(x,y)直接相邻的每个点(m,n)对应的cells[m][n]==true,则将(x,y)->(m,n)加入到边集合E中。假如当前grid中所有cell[x][y]都是true,则此时边集合E内会包含(x*y)*(x*y - 1)/2条边,与grid内的cell数量的二次方成正比,而这些边在grid结构体中不需要额外的字段去存储。所以在二维平面上地图表示主要使用grid,而不是通用的无向图结构。
由于在写代码时四方向移动比八方向移动简单点,所以后面我们主要讨论网格地图上的四方向移动。在grid地图上执行寻路时,我们可以沿用之前使用的dijkstra算法来计算两个cell之间的最短路径:

上图就是dijkstra算法在一个grid地图上执行后的结果,粉色方块为起点,紫色方块为终点,白色间隔线段为计算的最短路径,其他的彩色格子代表算法执行过程中遍历到的其他节点。可以看出在这个全连通grid上dijkstra算法计算出一条长度为n的最短路径时,所遍历到的cell数量为n*n,这种时间复杂度对于稍微长一点的路径来说不可接受,但这却是计算最短路径不可避免的代价。如果我们放松对路径长度的最短需求,只要求寻路时获取一条局部最优的连通路径,将极大的减少节点的搜索空间,降低搜索的整体运行时间。在这个思想的指导下,基于dijkstra算法的A*路径搜索算法被提出。dijkstra算法维护了一个代表距离起点具体的dis数组,而在A*算法中维护了三个数组:
g数组,g[n]代表从起点a到节点n的最短距离,等价于dijkstra中定义的dis数组,称之为实际costh数组,h[n]代表n点到终点b的预估距离,也叫启发cost,这个预估距离的函数可以任意自定义f数组,f[n]=g[n]+h[n],代表经过n点的从a到b的局部最优路径的估算距离,
在给出上述变量的定义之后,我们再来描述A*路径搜索的算法流程:
- 把起点
a放到A中,把每条以a为起点的边的终点v放到B中,更新g[v] = cost(a,v),同时计算h[v]以及f[v]。 - 从
B中找出f[u]最小的节点u,- 如果
u==b则构造结果路径列表L,执行do L.push(u), u=pre[u] while u!=a,执行结束之后reverse(L)就是我们所需要的一条a->b的最短路径 - 如果
u!=b,则将u放到A中。
- 如果
- 把每条以
u为起点的边的终点v进行如下处理:- 如果
v已经在集合A中则不做处理, - 如果
v不在集合A中,则加入到集合B, 计算temp_cost= g[u]+cost[u,v],如果h[v]没有计算过则计算h[v]。如果这个temp_cost小于已经记录的g[v]则更新g[v]=temp_cost同时更新pre[v]=u, f[v]=g[v]+h[v]。
- 如果
- 重复步骤
2和步骤3,直到B为空
如果算法结束时仍然无法从步骤2中返回结果路径,则代表a,b两点之间无连通路径。下图中就是将h[n]设置为n->b的最短距离时的A*执行结果:

A*算法的执行速度和路径质量严重依赖于h[n]函数的选择:
- 如果
h[n]等价于n到b的最短路径的cost,则搜索时只会遍历到a,b最短路径上的节点,此时执行速度最快,同时返回的也是最短路径 - 如果
h[n]小于n到b的最短路径的cost,则搜索时会遍历到更多的节点,但是返回的依旧是最短路径。h[n]越小则遍历的节点越多,算法执行的越慢。如果我们将启发函数h[n]设置为返回常量0之后,上述流程就等价于dijkstra的算法流程,返回的是最短路径 - 如果
h[n]大于n到b的最短路径的cost,则算法终止时返回的路径不保证是一条最短路径,但是此时遍历到的节点数量相对于dijkstra来说会少。这个h[n]与真实距离之间的差异越大,遍历的节点空间数量越少。当h[n]相对于g[n]有数量级的差异时,退化为纯的贪心搜索,即步骤2每次获取离目标点预估距离最小的点。
如果我们无法指定一个简单高效的函数来获取n到b的最短路径的cost,所以我们只能希望将h[n]设置的足够大来加速算法的执行,但是太大又会退化成贪心算法影响路径的质量(完全不考虑最短路径的计算结果)。所以实际使用中一般将h[n]设置为在无障碍物的情况下n->b的最短路径距离乘以一个(1,10]区间内的系数。在无障碍物的情况下,在四方向移动时的两点之间的最短距离为曼哈顿距离(Mahattan Distance),其计算方式为计算两点的坐标插值的两个分量的绝对值之和,计算时间为常数,在八方向移动时也有一个类似的切比雪夫距离(Chebyshev distance),其计算时间也是常数。另外还有一个常规的距离计算函数欧几里得距离(Euclidean distance),这个就是平面中两点的直线距离,如果选用这个函数作为启发距离计算,则要小心的挑选其常数系数,以保证最后的结果要大于最短路径距离。下图就是在支持八方向移动的地图中使用常数为1的欧几里得距离时的A*算法执行结果,可以看出获取了一条最短路径,同时节点搜索空间还是有很多冗余:

A*算法是所有带空间位置信息的寻路搜索算法中应用最广的算法,通过添加到目标点的启发距离cost,算法迭代时以贪心的方式选择下一个需要考虑的节点,可以极大的减小CloseSet(A)、OpenSet(B)的大小,从而加速整体的路径搜索流程。不过在可以八方向移动的二维网格上,有一个更优的JPS(Jump Pointer Search)算法,这个算法是由Daniel Harabor和Alban Grastien在2011年的论文Online Graph Pruning for Pathfinding on Grid Maps中提出的。在2012到2014年举办的三届基于Grid网格寻路的比赛GPPC(The Grid-Based Path Planning Competition)中,JPS已经被证明是基于无权重格子,在没有预处理的情况下寻路最快的算法。这种近乎于标准答案的JPS算法最终导致了GPPC的停办。
JPS算法也是基于A*算法的思想改造而来,其主要差异在于选取了一个点进入CloseSet之后应该将哪些点引入OpenSet中。在A*算法中,会将这个点的周围八个邻居节点中的所有非障碍物节点都尝试加入到OpenSet中,而JPS算法则会排除掉很多不必要的邻居节点。下面我们来分析JPS的邻居节点排除过程,这里我们规定沿着坐标轴移动到邻居的cost为1,而沿着对角线移动到邻居的cost为sqrt(2)。

我们先来考虑沿着坐标轴移动的情况,由于四个坐标轴的分析是对称的,所以我们只讨论向右移动的情况,见上图中的a。此时我们获取了一个新的CloseSet节点x,其父节点是网格4,我们来决定接下来将如何筛选x的八个邻居进入到OpenSet。对于这八个节点中的某个节点n,如果总是可以将4->x->n替换为一条更优的路径4->m或者4->x->m,则我们可以把这个点n安全的筛除,不再考虑进入OpenSet:
- 首先可以排除的是节点
4,因为4->x->4是一个回路,在路径上删除此回路会获得一条更短的路径, - 然后可以排除
1,2,6,7这四个点,因为4可以直接连接这四个点,直连距离都比经过x后的距离短, - 最后排除
3,8两个点,因为4->x->3的长度等于4->2->3的长度,4->x->8的长度等于4->7->8的长度。所以某条路径中如果包含了4->2->3,总是可以安全的替换为4->x->3,此时出现了等价最短路径的情况。从4移动到节点3出于贪心的考虑我们会选择离3最近的节点2而不是节点x。因此我们将节点3排除,同样的理由我们排除了节点8。
所以在无障碍物的情况下,由4->x之后,我们只需要考虑将节点5加入到OpenSet中。接下来我们考虑存在障碍物的情况:
- 如果节点
1或者节点6是障碍物,不影响之前的判断,因为路径更优的要求下,我们在任何情况下都不会移动到1或者6 - 如果节点
2或者节点7是障碍物,由于上下对称我们只讨论2是障碍物的情况,即上图c,此时4->2->3不再连通,4->x->3成为了4->3之间的唯一一条最短路径,因此我们无法将3从下一跳节点中排除,需要加入OpenSet。同样的理由如果7是障碍物,我们需要将8加入到OpenSet。这种原来不考虑进入OpenSet的点,在出现了障碍物之后需要进入OpenSet,定义为强制邻点(Forced Neighbours),此时的x被定义为跳点(Jump Point)。 - 如果节点
3或者节点8是障碍物,也不影响之前的判断,因为3和8不会出现在基于目标距离优先的经过4->x的搜索路径中 - 如果节点
5是障碍物,则5不再加入到OpenSet中,
基于类似的分析方式,在上图c描述的在无障碍物的情况下的对角线移动6->x中,我们需要将2,3,5加入到OpenSet;在4为障碍物时,需要加入节点1;在7为障碍物时需要加入节点8。
所以JPS算法就是基于跳点的搜索,不过这里的跳点不仅仅包括前述的由于强制邻居引发的跳点,还包括另外的两种跳点:
- 路径的起点和终点被定义为跳点
- 当
a->b是一个对角线移动时,由b可以只按照坐标轴移动可以连接到某个跳点,则b也是跳点
给定了跳点的完整定义之后,我们定义一个函数jps_search(m,n)代表从m移动到n之后如何在n点进行继续的搜索:
- 如果
n是终点,则返回 - 对
n进行坐标轴四方向直线搜索, 每个方向直线搜索的停止条件如下:- 遇到障碍物
- 超过地图边界
- 遇到已经在
OpenSet或者CloseSet中的点 - 遇到一个跳点
k,此时将此跳点k加入到OpenSet,并记录k的父节点为pre[k]=n停止搜索前遇到的点都加入到CloseSet中。
- 如果上面的坐标轴四方向搜索没有遇到终点,则分别对四个对角线方向的直接邻居
k记录其父节点为pre[k]=n,并递归执行jps_search(n,k)有了这个jps_search的定义之后,通过JPS算法获取(a,b)之间路径的A*流程如下: - 将
a加入到OpenSet(B),记录pre[a]=a - 从
B中获取综合cost最小的节点n,加入到CloseSet(A)- 如果
n是终点,则利用pre数组计算完整路径 - 如果
n不是终点,则调用jsp_search(pre[n], n)进行邻居搜索
- 如果
- 不断的执行步骤
2,直到OpenSet为空
下面我们以一个实际例子来描述JPS搜索的执行过程,初始设置见下图,绿色为起点,红色为终点,黑色为障碍物:

开始时以这个绿色节点执行jps_search,由于坐标轴方向都是障碍物或者地图边界,多次递归之后执行到途中的黄色节点:

这里之所以会停留在黄色节点是因为浅紫色节点有一个强制邻居即图中的紫色节点,所以浅紫色节点为跳点,而黄色节点可以水平移动到浅紫色节点,根据跳点的定义黄色节点也是跳点,所以本次jsp_search会以黄色节点加入到OpenSet结束。开始下一轮迭代后,从OpenSet中获取了这个黄色节点,并执行jps_search,




到上图之后遇到了作为终点的跳点,因此终点之前的点也为跳点,加入到OpenSet,再执行一次jps_search之后,OpenSet中只剩目标点,此时构造最终的路径:


JPS算法相对与A*搜索的优越性在于其OpenSet减少了很多,因此每次迭代时从OpenSet里获取最小值代价很低。不过其CloseSet相对于A*来说增大了很多,因为其四方向坐标轴搜索很有可能探寻到地图边界,所以真正的实现中一般使用bit数组来表示CloseSet,相对于使用std::unordered_set这样可以显著的降低维护CloseSet的内存和CPU时间。
基于体素的三维空间寻路
前面章节讨论的是在二维平面上的网格寻路,然而新千年以来的新游戏基本都是在三维空间内。在三维空间内的寻路可以继续沿用我们在二维空间内网格寻路的相关经验,采用A*算法来搜索连通路径。不过此时的寻路基本单元不再是平面空间内代表长方形的的cell,而是三维空间内代表长方体的的体素voxel,原来存储节点障碍物信息的二维数组也要拓展为三维数组,寻路时查找一个节点的邻居从二维连通变成了三维联通。如果允许体素的对角线连通的话,一个体素可能的邻居节点会有26个,这大大的增加了搜索空间的膨胀速度,所以一般来说体素寻路时只允许体素进行坐标轴方向的移动,一个体素可能的邻居节点数量降低为了6个,此时A*搜索时的启发函数可以使用三维空间的曼哈顿距离,后续的讨论中我们将使用此设定。

不过实际上并不会采取三维数组的形式来存储这些信息,因为其内存占用太大了。对于一个1km*1km*1km的空间,如果采用0.25m*0.25m*0.25m的立方体作为体素的大小,且每个体素占用一个字节,则整体体素的数量为4000*4000*4000字节,约为60GB,即使按照bit来存储也会消耗7GB,这种内存占用是无法接受的。考虑到一般情况下三维场景中障碍物的体积占比很小,绝大部分的体素都是不带有阻挡的,我们可以不存储非阻挡的部分来降低存储整体体素数据的内存占用。同时阻挡区和非阻挡区域很多时候都是由几何体构成的,在一定空间内是连续的。主流的利用这种阻挡体素的稀疏性和连续性的存储结构主要有两种:一种是高度场(height field),另外一种是八叉树(voxel tree)。
使用高度场存储体素
在定义高度场的存储结构之前,我们先定义一个叫span的概念,这个概念对应垂直坐标轴里一段连续的障碍物体素或者空体素:
struct span
{
int32 height:16; // 当前span的底部高度
int32 length:15; // 当前span的连续长度
int32 is_blocked:1; // 当前span是否是障碍物
};
原有的多个体素所要求的内存空间被一个只占用4个字节的span所代替,因此在内存上节省了很多。在三维空间的xz平面中我们构造一个二维的grid,grid里每个cell都包含对应平面坐标从最低点到最高点的所有span,同一个cell里的所有span按照其height增大的顺序进行排列,相邻的两个span的空间是连续的,即下面的span的height +length + 1等于上面span的height:
struct cell
{
// 最后一个span的高度为场景最大高度 is_blocked 设置为true 同时length 为1
span* span_vec;
int32 span_num;
};
struct height_field
{
std::vector<std::vector<cell>> xz_cells;
};

对于一个1000*1000*1000的空体素场景来说,每个cell只需要存储一个span,此时的高度场的总内存大小为1000*1000*(12 + 4)=16MB。
使用span对体素数据存储之后,内存占用减少了,但是对于寻路搜索时却增加了难度,此结构在搜索路径时引入一个重要的问题:如果找到包含(x,y,z)对应的体素的span。此时我们可以利用cell中所有span都按照height增加的方向进行排列的特性,使用二分查找即可找到对应的span。
span* cell::find_voxel(int32 y)
{
// 这里我们保证最后一个span的高度就是场景的最大高度 同时长度为0
// 所以temp_iter指向的内存区域永远有效
auto temp_iter = std::lower_bound(span_vec, span_vec + span_num, y, (const span* temp_span, int32 temp_value)
{
return temp_span->height < temp_value;
});
if(temp_iter->height == y)
{
return temp_iter;
}
if(temp_iter == span_vec)
{
return nullptr;
}
temp_iter--;
return temp_iter;
}
由于span内包含多个体素,为了记录路径搜索时路过的体素点,我们定义一个新的结构:
struct voxel_node
{
int32 x,y,z; // 体素对应的坐标
const span* voxel_span; //体素所在的span
};
接下来我们继续考虑搜索时如果遇到了一个voxel_node,如何获取其可连通的其他voxel_node:
-
对于竖直方向,直接获取当前
span里当前体素对应的的上下两个邻居体素的voxel_node加入到OpenSet,如果某个方向到达了边界,则不添加此方向的邻居体素 -
对于其他方向,计算对应的
(x,y,z),找到对应的cell,然后使用find_voxel来找到对应的span,如果此span不是阻挡体,则构造voxel_node(x,y,z,span)加入到OpenSet
采用这种方式来计算邻居并加入OpenSet会导致OpenSet变得很冗余,在一个完全连通的n*n*n的立方体区域执行两个对角的寻路会将立方体内的所有体素加入到OpenSet中,空间复杂度为距离的三次方,实际使用时这种复杂度完全无法接受。因此我们需要使用前述的JPS寻路的思想,利用span的连续性,去除冗余节点。假设当前点m在span_a中,通过span_a的另外一个点n直接连通到了xz平面相邻的某个span_b的点o,构造成了一条m->n->o的路径。此时找到m到span_b上距离最短的点i,此时我们可以证明m->i->o的路径长度一定不比m->n->o长。由于两条路径在xz平面上移动的距离一样都是1,所以我们只需要讨论这两条路径在y轴上的不同:
-
如果
m的y大于等于span_b的最大高度,则i点为span_b上高度最高的点,(m,o)之间的高度差等于(m,i)之间的高度差加上(i,o)的高度差,即m->i->o是一条从m到o的最短路径 -
如果
m的y小于span_b的最小高度,则i点为span_b上高度最低的点,(m,o)之间的高度差等于(m,i)之间的高度差加上(i,o)的高度差,即m->i->o是一条从m到o的最短路径 -
如果
m的y在span_b的高度区间内,则i点的高度等于m的高度,(m,o)之间的高度差等于(i,o)的高度差,即m->i->o是一条从m到o的最短路径
TODO 这里加入三张图来做图例
在这三种情况下我们得到了m->i->o的路径长度一定不比m->n->o长的结论,所以所有由m出发到span_b的路径中都只需要考虑i点即可,i点类似与JPS搜索中的跳点概念。所以当我们从OpenSet中选取到一个voxel_node后,获取与这个voxel_node->voxel_span直接连通的所有span,计算当前voxel_node到这些span的最近点,加入到OpenSet中。这里需要在cell上来添加一个辅助函数,来查询与[a,b)区间有交集的所有非空span:
vector<const span*> cell::find_spans(int32 a, int32 b) const
{
auto temp_span = std::lower_bound(span_vec, span_vec + span_num, a, (const span* temp_span, int32 temp_value)
{
return temp_span->height < temp_value;
});
if(temp_span != span_vec)
{
temp_span--;
}
vector<const span*> result;
while(temp_span->height < b)
{
if(temp_span->height + temp_span->length > a)
{
result.push_back(temp_span);
}
temp_span++;
}
return result;
}
对于一个span来说,其离m的最近点只有一个,所以OpenSet中元素的标识可以使用span来代替,而不是用(x,y,z)。此时基于体素的寻路搜索就变成了基于span的寻路搜索,如果目标点在当前span内,则不再执行搜索,直接将目标节点的父节点设置为当前节点,然后使用父节点信息计算起点到终点的连通路径。原来的空心立方体的对角搜索最坏情况降低到了n*n个span,如果采取对角线优先的启发距离,则可以进一步降低到2*n。
使用八叉树存储体素
前述的使用高度场来存储体素数据已经起到了很好的内存削减和寻路搜索加速的效果,不过这个高度场信息只利用了体素数据在y轴上的连续性,在xz这两个轴上的连续性没有被使用。这样就导致了在一个1000*1000*1000的空体素场景中,高度场的总内存大小为1000*1000*(12 + 4)=16MB。为了进一步利用三个轴的体素连续性,研究人员提出了八叉树(Octree)这种结构:

八叉树是一种树形结构,每个非叶子节点都会有八个子节点,每个节点都对应场景中的一个立方体区域,子节点的立方体边长是父节点立方体边长的一半。场景体素的八叉树存储结构采用自顶而下的方法来构建:
-
获取场景的最小立方体包围盒,构造顶层节点,并加入到未处理节点的队列之中
-
每次从队列中获取一个节点,判断这个节点对应的立方体区域是否全为阻挡或者全连通,
-
如果为全阻挡或者全连通,此时不需要继续划分子节点,只需要在当前节点设置上阻挡标记或者连通标记
-
不是全阻挡也不是全连通,需要划分为八个子节点,并将这八个新子节点加入到节点处理队列中
-
虽然这个八叉树是一个树形结构,不过实际的内存中使用的是基于数组的存储:
struct octree_node
{
uint32 x:21;
uint32 y:21;
uint32 z:21;
uint32 first_child; // 在下一个layer中他子节点的开始位置索引
uint32 parent:31; // 父节点在上一个layer中的索引 对于顶层节点来说这个值无意义
uint8 is_full_block: 1; //是否是全连通或者全阻挡 只有first_child无效时才有意义
};
struct octree_layer
{
uint32 layer_index;
float voxel_length;
std::vector<octree_node> nodes;
};
struct octree
{
std::vector<octree_layer> layers;
};
最顶层的octree_node为octree.layers.back().nodes[0]。如果octree_node有子节点,则其八个子节点按照特定的顺序在下一层octree_layer::nodes中连续排列,而octree_node->first_child则是第一个子节点在对应下一层octree_layer::nodes中的索引。如果octree_node没有子节点,则first_child设置为numeric_limit<uint32>::max()。octree_node中的(x,y,z)代表以顶层立方体包围盒最低坐标为原点,以当前octree_layer::voxel_length为体素边长计算出来的当前octree_node的体素坐标,这里为了节省空间三个坐标合并为了一个uint64,这也限制了最大层级为21。在最细粒度的体素边长为25cm时,21层所覆盖的最大边长为256km,短期内应该不会有游戏场景的大小超过此长度。
使用这样的结构来存储体素数据可以节省更多的内存占用,对于一个空场景而言只需要存储一个顶层的octree_node即可,也就几十个字节,相对于使用高度场数据的20MB来说是一个非常了不起的优化。如果一个1024*1024*1024的场景里底面最细那一层全是阻挡体,则引入的所有新节点带来的内存占用为1024*1024*2*16*8/7=36MB,而高度场需要加入的新span所需内存为1024*1024*4=4MB,加上之前空场景所需的16MB也比八叉树占用的数据小。为了继续维护内存需求小的优势,我们需要利用一个octree_node的所有八个子节点是连续分配的特点,修改相关结构体的定义:
struct voxel_pos
{
uint32 x:21;
uint32 y:21;
uint32 z:21;
};
struct octree_node
{
uint32 first_child: 28;
uint8 x_diff:1;
uint8 y_diff:1;
uint8 z_diff:1;
uint8 is_full_block:1;
};
struct first_child_info
{
voxel_pos pos;
uint32 parent;
};
struct octree_layer
{
uint32 layer_index;
float voxel_length;
std::vector<octree_node> nodes;
std::vector<first_child_info> first_child_infos;
};
这里每个octree_node不再存储其体素的(x,y,z)坐标,而是存储其体素坐标相对于其父节点的第一个子节点的体素坐标的差异值。此时第一个子节点的体素位于这八个连续子节点所代表的2*2*2立方体的最低点,其他的七个点与first_child的体素坐标差异在三个轴上最大值都是1,所以这里我们用x_diff,y_diff,z_diff三个bit来存储。同时八个子节点的排列方式按照(x_diff * 4 + y_diff*2 + z_diff)递增的顺序,这样获取了任意一个octree_node的指针A之后都可以通过A-(A->x_diff * 4 + A->y_diff*2 + A->z_diff)的形式来获取这八个连续子节点的第一个节点的指针,进而可以获得第一个节点在nodes中的索引。对于每个作为first_child的octree_node都在对应octree_layer::first_child_infos里有一个元素来存储他的完整体素坐标和父节点的偏移。利用这些信息我们可以恢复之前定义在octree_node中的parent和(x,y,z)信息:
first_child_info octree_layer::get_first_child_of_parent(const octree_node* A) const
{
auto first_child_of_A_parent = A-(A->x_diff * 4 + A->y_diff*2 + A->z_diff);
auto first_child_idx = first_child_of_A_parent->nodes.data();
return first_child_infos[first_child_idx/8];
}
uint32 octree_layer::get_node_parent(const octree_node* A) const
{
return get_first_child_of_parent(A).parent;
}
voxel_pos octree_layer::get_node_pos(const octree_node* A) const
{
auto first_child_pos = get_first_child_of_parent(A).pos;
first_child_pos.x += A->x;
first_child_pos.y += A->y;
first_child_pos.z += A->z;
return first_child_pos;
}
通过这样的优化,每个octree_node只需要占用4个字节,再平摊一下first_child_infos的开销,一个最底层的octree_node大概会引入6个字节的开销,相对于之前的16*8/7=18来说节省到了原来的1/3,这样相对于高度场来说降低了一半以上的内存消耗,继续保持了其内存占用上的优势。
搞定了内存存储结构之后,我们开始使用这个八叉树结构来使用A*搜索计算两点之间的连通路径。此时我们面临之前使用稀疏高度场同样的问题:给定一个体素坐标(x,y,z),如果获取其对应的octree_node和对应的octree_layer。这里我们可以利用八叉树的节点性质来自顶向下查询,为了让代码更简洁我们使用第一个版本的结构体定义:
std::pair<const octree_layer*, const octree_node*> octree::get_node(uint32 x, uint32 y, uint32 z) const
{
auto total_layer = m_layers.size();
vector<uint8> offsets;
while(total_layer > 0)
{
uint8 temp_offset = (x%2) * 4 + (y%2) * 2 + (z%2);
x>>=1;
y>>=1;
z>>=1;
offsets.push_back(temp_offset);
total_layer--;
}
uint32 first_child_offset = 0;
uint32 query_layer = m_layers.size();
while(!offsets.empty())
{
auto temp_offset = offsets.back();
offsets.pop_back();
const octree_node* cur_node = layers[query_layer].nodes[first_child_offset + temp_offset];
if(cur_node->first_child == std::numeric_limits<uint32>::max())
{
return make_pair(&layers[query_layer], cur_node);
}
first_child_offset = cur_node->first_child;
query_layer--;
}
assert(false);
}
如果我们在排列octree_layer::nodes的时候按照octree_node对应体素坐标的递增序,则对nodes做二分查询即可获取一个体素坐标对应的octree_node,不过此时需要先查询最底层的layer,当查询不到时再递归向上查询。如果采取了节省内存的第二个版本结构体定义的话,我们可以使用first_child_infos来进行二分查询,因为first_child_infos也是按照voxel_pos增长序排列的。
解决了voxel_pos到octree_node的映射之后,我们也顺带解决如何获取一个octree_node的六个方向邻居的问题,不过这里获取邻居节点又有其特殊性,如果某个方向邻居节点是有子节点的,我们需要获取当前邻居节点所有层级的子节点中全连通且与当前节点连接的子节点,这里我们需要一个函数来快速的判定两个带layer信息的voxel_pos对应的体素是否直接邻接:
// 计算一个voxel pos在上一层layer中父节点对应的voxel_pos
void voxel_pos::to_parent()
{
x = x>>1;
y = y>>1;
z = z>>1;
}
bool is_voxel_pos_linked(uint32 layer_a, voxel_pos pos_a, uint32 layer_b, voxel_pos b)
{
if(layer_a < layer_b)
{
swap(layer_a, layer_b);
swap(pos_a, pos_b);
}
while(layer_a != layer_b)
{
b.to_parent();
layer_b++;
}
// 在同一层级上的两个节点直接连接的条件是 x y z三个轴的差值有且只有一个是1
int64 diff_value = 0;
diff_value += abs(int64(pos_a.x) - pos_b.x);
diff_value += abs(int64(pos_a.y) - pos_b.y);
diff_value += abs(int64(pos_a.z) - pos_b.z);
return diff_value == 1;
}
至此,A*搜索时的节点定位问题和邻居查找问题都解决了。
如果场景在轴上的体素个数大于4096,八叉树的内存需求又变得严峻起来。在一个包含了4096*4096*4096的三维空间中,假如xz平面的底面平铺一层阻挡体,即使使用压缩内存版本的结构体定义,八叉树的内存占用为4096*4096*2*6=192MB。考虑到以0.25cm作为体素单位长度的情况下,4096个体素对应的边长为1km,1km*1km*1km的场景需要这样的内存需求大小还是难以接受,特别是现在游戏场景的边长都超过8km,逐渐往16km狂奔。仔细分析一下每一层的节点内存信息,发现最底层的节点数量大约是其他所有层节点数量总和的7倍,而最底层的所有octree_node是没有子节点的,所以first_child字段是可以删除的。在压缩内存的结构体定义中删除占据28bit的字段导致只剩4bit,即可以进一步节省内存到原来的1/8,从而可以将总体内存消耗降低到1/4。基于这样的字段压缩思想,游戏界提出了稀疏八叉树SVO(Sparse Voxel Octree)。SVO在字段压缩上更为激进,他将原来底下三层数据的信息合并为一层特殊的leaf_layer,同时在leaf_layer中只存储uint64,每个uint64代表一个4*4*4的体素立方体里每个体素是否是阻挡体的信息,即一个体素只需要1bit来存储。此时稀疏八叉树的结构体定义如下:
struct leaf_node
{
uint64 value;
};
struct voxel_pos
{
uint32 x:21;
uint32 y:21;
uint32 z:21;
};
struct octree_node
{
uint32 first_child: 28;
uint8 x_diff:1;
uint8 y_diff:1;
uint8 z_diff:1;
uint8 is_full_block:1;
};
struct first_child_info
{
voxel_pos pos;
uint32 parent;
};
struct octree_layer
{
uint32 layer_index;
float voxel_length;
std::vector<octree_node> nodes;
std::vector<first_child_info> first_child_infos;
};
struct svo_tree
{
vector<octree_layer> layers;
vector<leaf_node> leafs;
};
最底下一层的octree_layer里octree_node::first_child指向的是svo_tree::leafs里的偏移量。下面是一个图形化的结构示例:

在采用此结构来存储体素数据之后,在一个包含了4096*4096*4096的三维空间中,假如xz平面的底面平铺一层阻挡体,稀疏八叉树的内存占用为(4096/4)*(4096/4)*(8+5)=13MB,相对于之前的192MB简直妙手回春医学奇迹!不过这样的内存优化也有其劣势,即路径搜索获取一个节点的邻居时,如果遇到了最下面一层的layer,需要计算邻接当前节点的leaf_node对应面的16个体素是否可以直接连通。此外加入到OpenSet里的数据还要区分是正常的octree_node还是leaf_node。为了在搜索时对这两类node做统一存储,我们定义一个svo_link的结构:
struct svo_link
{
int32 layer: 4; // 当前node所在的层级
int32 node_offset: 22; // 当前node在对应层级的nodes里的索引
int32 leaf_cube_offset: 6; // 如果layer为0 且node不是全阻挡或者全联通 则这六个bit代表对应的leaf_node value字段里的bit索引
};
由于A*搜索时如果遇到一个leaf_node需要考虑相邻的16个voxel对应的bit,最大会引入新的16个元素到OpenSet中,所以我们的启发函数最好加入一个对svo_link::layer的惩罚项,layer越小,惩罚越大。这样的启发函数设置会让路径优先使用更大粒度的octree_node,从而减少搜索时OpenSet的大小。
基于寻路网格的三维地表寻路
前面章节介绍的是一个可以六方向移动的单位如何在三维场景中使用体素数据进行寻路,可以用来解决空中寻路和水下寻路的问题。但是这种寻路方式与我们常见的游戏中的entity移动方式很不一样,常见的游戏中我们控制的角色和服务端创建的NPC都是只能在特定物体的表面进行寻路的,同时要求这些物体表面的斜率不能太大。为了解决这种最常见的地表寻路问题,我们首先需要解决如何定义三维空间中的可行走表面。例如我们可以继续利用之前定义的三维空间的高度场span体素数据,只使用可连通span的底面体素作为可行走表面,执行A*寻路时OpenSet中只支持span的底面体素。如果从span(A)的底面移动到span(B)底面面带来的体素高度差大于特定值,则认为这条连接斜率过大,不再具有连通性。加入这些限制之后,我们就可以继续用体素进行三维地表的寻路了。不过之前也提到了使用高度场会带来非常大的内存需求,1024*1024*1024的空场景都需要1024*1024*(12 + 4)=16MB的内存,在常规使用的人物地表移动所需的体素精度一般为25cm,此时16MB只能表示边长为256M的立方体区域,对于常规的2KM左右边长地图来说,所需的内存就会膨胀到160MB,完全不具有可行性。
为了使用较少的内存来解决三维地表寻路的问题,研究人员提出了寻路网格NavMesh(Navigation Mesh)这样的数据结构。在NavMesh中,可行走地表区域不再使用体素来表示,而是使用多个凸多边形Convex Polygon来表示。如果两个凸多边形之间共享了一条边,则这两个凸多边形可以通过这条边进行连通。下图中就是一个使用NavMesh表示的地图可行走区域,绿色区域代表可行走区域,亮绿色的线段表示凸多边形的边,所使用的凸多边形为三角形:

从上图中可以看出,只需要三个点组成的三角形即可描述一块非常大的平坦开阔区域。对于一个完整的空正方形场景,仅需两个三角形即可描述全场景的寻路表面,相对于直接用体素span来描述这块可行走表面来说内存节省了很多。在实际使用中的1km*1km左右的地图中,其对应的NavMesh一般不会超过1M。正因为NavMesh在内存上的优势,导致现在基本所有的游戏引擎在寻路功能上都采用基于开源的RecastNavigation来构造三维空间的地表寻路NavMesh数据。

由于RecastNavigation作为寻路的基础解决方案的重要性,本书中后续将使用一整个章节去介绍RecastNavigation的原理细节,下面将只简要介绍一下如何在二维场景中构造NavMesh的基本流程。
在二维场景中构造NavMesh我们依然需要以来grid数据,我们用特定粒度的正方形网格对地图grid进行采样,来标记grid中的可行走区域和不可行走区域。在下图中我们用一条首尾相连的曲线来描述可行走区域的边缘:

有了整个grid数据之后,我们获取整个可行走区域的内测边缘,获取一个连通线段组成的回路:

由于按照坐标轴构造的线段过于曲折,接下来按照一定的规则删除部分不必要的端点,构造一个形状基本相似的简化多边形:

有了这个简化多边形之后,下一步需要将简化多边形切分为多个凸多边形,这里最经典的方法是使用Delaunay方法进行三角形划分。不过我们这里并不要求切分为三角形,所以采用一个基于贪心的方法。遍历每个点A,获取多边形中对应最近的点B,如果(A,B)不是一条现有的线段,则将线段加入到待处理优先队列,其优先级与线段的长度除以两点之间相连之前最短路径的长度的比值相关,比值越小,优先级越大:

利用上述规则,构造一个多边形处理队列,初始时将原始多边形放入其中。每次从队列中拿出一个多边形,如果此多边形不是凸多边形,则获取最优分割边将此多边形切分为两个子多边形,并加入到多边形处理队列中。不断处理直到队列为空,最终形成了下图:

上述方法只处理了可行走区域内无阻挡区域的情况,对于有阻挡区域时,对阻挡区域的多边形里的每个点都找到场景中离其最近且原本不连通的点,如果不是已经有的边且不与现有的所有边交叉则构造新边,边的添加优先级与边的长度成反比:

如果新添加的边形成了一个新的连通多边形,且不是凸多边形,则递归进行处理,最终得到了这样的结果:

这样我们就得到了一个包含了阻挡体的NavMesh,流程看上去很简单但是其实很多细节在里面。更多的细节将在后面介绍RecastNavigation的章节中进行提及,自此我们将认为从原始地图生成NavMesh是一个已经解决了的问题。
在拥有了描述场景地表数据的NavMesh之后,用这个数据来执行点与点之间寻路的流程替换成了凸多边形之间的路径搜索。不过这里我们首先需要解决如何查找点所对应的凸多边形。
判断点在是否在凸多边形内的算法是确定的,在凸多边形的顶点按照逆时针排列成为P(0),P(1)...P(n)的时候,检查目标点P是否在所有的P(a)->P(a+1)直线直线的同一边,如果都在同一边则认为此点在凸多边形内。这里判断是否在同一边就是计算向量P(a)->P 与向量P(a)->P(a+1)的叉积的值的正负性是否不变, 如果正负性发生改变则认为不在凸多边形内。

bool IsInConvexPolygon(Point testPoint, vector<Point> polygon)
{
//Check if a triangle or higher n-gon
Debug.Assert(polygon.size() >= 3);
//n>2 Keep track of cross product sign changes
auto pos = 0;
auto neg = 0;
for (auto i = 0; i < polygon.size(); i++)
{
//If point is in the polygon
if (polygon[i] == testPoint)
return true;
//Form a segment between the i'th point
auto x1 = polygon[i].X;
auto y1 = polygon[i].Y;
//And the i+1'th, or if i is the last, with the first point
auto i2 = (i+1)%polygon.Count;
auto x2 = polygon[i2].X;
auto y2 = polygon[i2].Y;
auto x = testPoint.X;
auto y = testPoint.Y;
//Compute the cross product
auto d = (x - x1)*(y2 - y1) - (y - y1)*(x2 - x1);
if (d > 0) pos++;
if (d < 0) neg++;
//If the sign changes, then point is outside
if (pos > 0 && neg > 0)
return false;
}
//If no change in direction, then on same side of all segments, and thus inside
return true;
}
有了这个判定方法之后,我们可以通过遍历NavMesh内的所有凸多边形执行上述函数来获取目标点所在的凸多边形,但是这样做的话效率太低了。我们需要一个更加快速的查询结构来应对静态Mesh场景中点对应的凸多边形查询,游戏中经常用层次包围体BVH(Bounding Volume Hierachy)这样的结构。所谓包围体Bounding Volume就是一个可以将任意几何体包围的简单形状,常见的包围形状包括轴对齐包围盒(axis-aligned bounding box,AABB 包围盒)、包围球(bounding sphere)以及定向包容盒子(Oriented Bounding Box,OBB)等。为了实现上讨论简单我们这里只使用AABB的包围盒,因为计算时只需要获取多边形内所有点在所有轴的最大最小值即可。

所谓的层次包围体技术 指的是将所有包围体分层逐次地再次包围,获得一个更大的包围体,直到包围住所有物 体。

构建BVH树的算法比较简单,采用的是一个自顶而下的树形构建方法:
- 初始时构造一个节点,节点内包含所有的多边形,
- 然后选用一条与轴平行的直线,将所有的多边形按照多边形包围盒中点在直线的左右侧进行分割,生成左侧多边形集合和右侧多边形集合,
- 分别构造左右子树对应这两个集合并递归的进行
BVH构建,直到集合内多边形数量大小小于特定值(例如5)。 - 构造完成之后,每个叶子节点计算其内部的所有多边形的包围盒的并集,作为此叶子的包围盒大小
- 每个非叶子节点的包围盒为两个子节点的包围盒的并集
上述算法的核心就是如何在步骤2中选取一条合适的与轴平行的直线,使得两个子树的大小均衡。实践中会采用这样的方法来选取:
- 首先获取节点对应的多边形集合在各个轴上所占据的区间的并集,计算轴上覆盖的长度,选取覆盖长度最长的轴
- 获取节点对应的多边形集合的包围体中心坐标在所选取的轴上的投影值集合,获取其中的中位数作为结果直线的坐标值

第一步的耗时与多边形数量成简单线性关系,第二步的计算可以直接使用std::nth_element来计算中位数,时间复杂度也是多边形数量的简单线性。所以自顶向下构造BVH的时间复杂度类似与快速排序的时间复杂度。对于静态场景的离线计算来说这样的时间复杂度是非常优秀的,而且其占用的内存只是原始多边形数据的几分之一。
有了BVH之后我们就可以快速的进行查询点所在的凸多边形了:
- 首先构造一个节点队列,将
BVH根节点放入队列中 - 每次从队列中
POP一个节点,检查目标点是否在此节点的包围盒中,如果在包围盒内- 如果当前节点不是叶子节点,则将当前节点的两个叶子节点放入节点队列等待后续处理
- 如果当前节点是叶子节点,遍历当前叶子节点内存储的凸多边形列表,使用凸包算法判定点是否在凸多边形内,如果在则作为结果返回
- 如果队列为空 则代表节点无对应的凸多边形
整体流程就是一个二叉树的递归下降流程,不过这里又有点不同,因为一个节点的左右两个子节点都可能会进入处理。所以查询的最优时间复杂度等价于树高,最坏时等价于整体多边形集合的大小,不过正常情况下的执行时间跟最优复杂度差不多。如果想构造更为均衡的BVH树来执行查询,可以参考使用基于表面积启发SAH((Surface Area Heuristic)的划分方法,具体细节参考PBRT的关于BVH的内容
解决了获取点对应的凸多边形这个问题之后,使用NavMesh计算寻路就是在计算凸多边形的连通路径。这里可以继续使用我们在A*搜索上的相关经验,不过在最终的路径生成时还需要结合NavMesh自身的一些特性来做优化。以下图中的NavMesh为例,我们要执行一次从C1到C8的连通路径查询:

根据共享边决定的凸多边形连通性,我们可以获得下面的这个红色多边形路径:

获取多边形路径之后我们获取相邻的两个多边形所共享的边,如下图中的粉色线段所示:

有了这些边之后我们可以很方便的构造出由线段组成的路径:选取每个多边形在连通路径上的两个共享边,将两条边的中点进行连接,并将起点和终点拼接到首尾,最终形成了下图中的橙色路径。

但是我们观察上图发现,浅蓝色路径也是一条通过多边形路径的线段路径,同时这条路径的总长度比黄色路径短很多。事实上经过首尾两点并通过指定多边形列表的线段路径有很多条,例如将多边形的重心连接起来也是一条满足需求的路径。为了找到连通两点并经过指定连通多边形路径的最短线段路径,我们需要使用拉绳算法。这个算法执行的是一个贪心过程,算法目标是计算出路径中所有的拐点,也就是线段之间的连接点。我们的目的是尽可能的减少拐点,以达到路径平滑同时路径最短的目的。算法的具体流程见下图:

初始的时候我们需要如下参数:
-
start作为寻路的起点 -
left,right作为中间临时变量 他们与start围成的三角形记录可行走区域 -
polys记录剩下的还未探索的多边形
初始时可以把left right 设置为第一个Poly边上的两点, 即A图。接下来,需要依次移动左右两条边,按顺序处理路径上的Poly。每次遇到一条新的边,考察这个边的left2 right2与start之间的连线与原来的left right线段之间的关系。其实left2 right2总是会有一个点与left right重合,所以只需要考虑新加入的节点即可。
-
如果新加入的节点与
start之间的线段与原来的left->right线段相交,则更新left right为left2, right2,即缩小可行走三角形区域 如上图中的B, C,D -
不相交的话,我们需要将
left right里与left2 right2重合的点设置为拐点,加入到拐点路径里, 然后将这个点当作新的start,下一条Poly边的left right作为新的left right,一路迭代下去, 如图F G
整个算法一路迭代,处理了最后一个多边形之后,判断最后一个拐点是否在此多边形与终点所在的多边形共享边上:
- 如果在共享边上,则直接拼接终点到已经计算出来的路径之后作为最后的起点到终点之间的连通路径。
- 如果不在共享边上,
- 如果最后一个拐点与终点之间的线段不会离开连通路径中的多边形,则直接把终点加入到拐点最后,生成一条连通路径
- 如果连线会离开连通路径中的多边形,则选择拐点经共享边两个端点连接到终点的这两条路径中的长度最小者,作为最后的拐点,并把终点拼接到最后,生成一条连通路径
对于三维场景中的凸多边形列表,我们可以将这些凸多边形都投影到xz平面,再执行上述算法。获取平面路径之后,再计算这条路径与平面投影共享边相交的点,将这些点转换为三维平面对应共享边的点之后,插入到原有路径中,即可获取三维场景中拉绳算法计算出的最短路径。
游戏中的随机
游戏中的很多系统会使用随机性来构造更加丰富的游戏体验。由于随机结果是无法预测的,所以随机判定发生了那种远超玩家预期的结果时会给玩家带来一些值得记忆与分享的惊喜时刻。其中随机性特性最明显的场景主要有如下两类:
- 依据指定概率触发某种事件,攻击触发暴击、被动晕就是这种。打过
Dota的玩家想必都体验过白牛无限晕、蓝胖四倍点金的快乐:

在暗黑破坏神2中刷了几十年的的玩家想必都知道满变量悔恨的含金量:

- 依据指定权重分布来生成某种物品道具,玩家常说的开宝箱就是这种。但是一般来说这些随机事件的概率都不是很高,所以大部分时候玩家对于这种随机性感受到的体验都比较负面。这种概率过低的随机造成了无数个玩家即使在暗黑破坏神2中游荡了几十年依然没有收集齐所有的符文,并在阴阳师大火的时候获得了“非酋”这一专属荣誉称号。各位“非酋”也试图在游戏内表演斋戒焚香沐浴更衣等各种行为艺术来试图增加这些物品的掉落概率,这也就造成了游戏界口口相传的一句话:
玄不救非, 氪不改命
在受到无数玩家的口诛笔伐之后,最终相关监管部门提出了游戏公司必须提前公布随机掉落概率这一规定。不过这种公布概率的规定也并没有抚平各位“非酋”心中的创伤,也没有扭转他们之前的各种玄学行为。
为此本章就来介绍一下游戏内对于这两种随机机制的具体实现,来解答玩家对这种随机性的疑惑。
依概率触发
依概率触发事件这种随机模式最广为人知的场景就是攻击时判定是否触发暴击、闪避、眩晕等这些特殊攻击效果,由于这些随机事件的概率一般都是在5%以上,所以玩家对于这种随机事件都是喜闻乐见。这种随机事件的判定计算也比较简单,利用随机数生成器生成一个[0,1]之间的浮点数a,然后与指定的发生概率b做比较,如果a>=b则代表这个随机事件判定通过可以发生。
虽然这种简单的实现虽然是正确的,但是由于攻击判定的频率比较高,可以达到一秒多次,每次随机判定之间毫无关系,这样就很容易出现短时间内若干的连续不触发或者连续的触发这种现象,也就是Dota玩家常说的“欧洲晕锤”与“非洲晕锤”。假设攻击时有25%的概率触发被动晕,则连续三次攻击中触发被动晕的次数呈如下分布,三次都触发眩晕的概率为0.0156:

这样的分布还是比较符合玩家直觉的,我们考虑进一步把攻击次数扩大到100,此时再来看一下触发次数的概率分布:

可以看出触发次数在[20,30]区间内的概率为0.8,在这个区间之外的概率仍然比较大超过0.2,连续10次没有触发的概率为,这个值仍然比较大。这种随机事件的由于其相互独立的性质导致出现与平均概率比较大差异的情况仍然是比较常见的。这种概率偏差对于观赏性来说是一种佐料,但是对于游戏平衡性来说却是有害的。因为这种刀刀烈火、无限被动晕现象的出现非常明显的影响了游戏的后续结算结果。对于竞技游戏来说这种完全不可控的随机性是要想方设法避免的,所以Dota2 6.81版本中率先引入了反击螺旋随机触发的伪随机机制:

伪随机分布(pseudo-random distribution,简称PRD)在DotA中用来表示关于一些有一定几率的装备和技能的统计机制。在这种实现中,事件的几率会在每一次没有发生时增加,但作为补偿,第一次的几率较低。这使得效果的触发结果更加一致。
效果在上次成功触发后第N次测试中发生的几率(简称proc)成功触发的几率为。对于每一个没有成功触发的实例来说,PRD系统为下一次效果触发的几率增加一个常数C。该常数也会作为初始几率,比效果说明中的几率要低,并且是不可见的。一旦效果触发,这个计数器就会重置。
技术上,如果以表示第次事件发生, 表示事件不发生。伪随机分布使用。同样地,当时,如果事件一直没有发生,那么在满足中的最小次判定时,该事件必然发生。 事件必然发生。因此概率分布为:
用更通俗的文字来阐述伪随机的做法是这样的:
- 对于某个以概率
P触发的事件,计算出一个数值C,同时建立一个变量N来记录已经连续多少次没有触发 - 每次采样来计算触发时,使用的概率不是
P,而是(N+1)*Q, - 如果此次采样触发了这个事件,则将
N设置为0 - 如果此次采样没有触发这个事件,则将
N设置为N+1
总的来说就是随着连续不触发的次数增大对应的触发概率也增大,这样总会导致概率大于1,从而引发一次事件触发。下面就是碎颅锤的伪随机例子:
对近战英雄,碎颅锤的重击有
25%几率对目标造成眩晕。那么在第一次攻击时,实际上只有大约8.5%几率触发重击。随后每一次失败的触发会增加大约8.5%触发几率。于是到了第二次攻击,几率就变成大约17%,第三次大约25.5%,以此类推。在一次重击触发后,下一次攻击的触发几率又会重置到大约8.5%。那么经过一段时间之后,这些重击几率的平均值就会接近25%。
为了保证这个PRD算法的正确,我们需要为每个P计算出对应的C出来,这里需要用到牛顿二分法来做数值逼近:
// 这个函数计算给定的C时对应的P
double PfromC( double C )
{
double pProcOnN = 0;
double pProcByN = 0;
double sumNpProcOnN = 0;
int maxFails = (int)std::ceil( 1 / C );
for (int N = 1; N <= maxFails; ++N)
{
pProcOnN = std::min( 1, N * C ) * (1 - pProcByN);
pProcByN += pProcOnN;
sumNpProcOnN += N * pProcOnN;
}
return ( 1 / sumNpProcOnN );
}
// 下面这个函数使用二分法来逼近 直到由这个C计算出来的P与传入的P基本相等
double CfromP( double p )
{
double Cupper = p;
double Clower = 0;
double Cmid;
double p1;
double p2 = 1;
while(true)
{
Cmid = ( Cupper + Clower ) / 2;
p1 = PfromC( Cmid );
if ( std::abs( p1 - p2 ) <= 0 ) break;
if ( p1 > p )
{
Cupper = Cmid;
}
else
{
Clower = Cmid;
}
p2 = p1;
}
return Cmid;
}
每次去运行时计算这个C还是比较耗时的,幸运的是对于常见的概率已经有现成的表格可以查询到对应的C值:
| C | 通常几率 | C近似值 |
|---|---|---|
| 0.003801658303553139101756466 | 5% | 0.38% |
| 0.014745844781072675877050816 | 10% | 1.5% |
| 0.032220914373087674975117359 | 15% | 3.2% |
| 0.055704042949781851858398652 | 20% | 5.6% |
| 0.084744091852316990275274806 | 25% | 8.5% |
| 0.118949192725403987583755553 | 30% | 12% |
| 0.157983098125747077557540462 | 35% | 16% |
| 0.201547413607754017070679639 | 40% | 20% |
| 0.249306998440163189714677100 | 45% | 25% |
| 0.302103025348741965169160432 | 50% | 30% |
| 0.360397850933168697104686803 | 55% | 36% |
| 0.422649730810374235490851220 | 60% | 42% |
| 0.481125478337229174401911323 | 65% | 48% |
| 0.571428571428571428571428572 | 70% | 57% |
| 0.666666666666666666666666667 | 75% | 67% |
| 0.750000000000000000000000000 | 80% | 75% |
| 0.823529411764705882352941177 | 85% | 82% |
| 0.888888888888888888888888889 | 90% | 89% |
| 0.947368421052631578947368421 | 95% | 95% |
下面我们来使用程序来模拟C=0.085的时候统计触发所需次数,对应程序的代码在https://github.com/huangfeidian/random_util/blob/main/test/test_trigger_by_prob.cpp上,下面的是统计了100次触发的所需次数分布结果:
| 第N次才触发 | 出现次数 |
|---|---|
| 1 | 7 |
| 2 | 17 |
| 3 | 21 |
| 4 | 22 |
| 5 | 16 |
| 6 | 8 |
| 7 | 5 |
| 8 | 4 |
从这个表格中可以看出,触发所需次数大多在[2,5]区间内,比较平均,同时大于8次才触发的情况并没有出现。而且完成这100次触发所执行的采样次数为1*7+2*17+3*21+4*22+5*16+6*8+7*5+8*4=387,对应的触发概率为0.258,与目标值非常接近了。
基于PRD的效果连续多次触发或多次不触发是非常罕见的。这使得游戏的运气成分大大降低,在Dota 2中一个有那么多带几率的技能的世界中增加了一致性。
加权随机选择
加权随机选择(Weighted Random Choice)就是广大玩家深恶痛绝的开箱子玩法,打开宝箱玩家可以获取多种物品中的一个。单个宝箱中不同物品出现的概率由策划配置的出现权重来指定,在配表中会以这样的vector<pair<item_id, item_weight>>的形式存在,同时指定item_id为0的配置项对应本次宝箱结算不生成任何物品。对于这样的一个配置数组A,设total_weight为这个数组中所有元素的item_weight之和,则第i个item对应的随机获取概率为A[i].item_weight/total_weight。一般来说这里的item_weight类型就是uint32,所以这个宝箱结算问题也被成为离散加权随机选择。
这个问题最简单的解法就是生成一个[0, total_weight)之间的随机数A,然后顺序遍历这个权重数组并扣除每个元素对应的权重值B,直到A<B就返回当前遍历到的元素对应的item_id作为选择结果:
struct item_weight_config
{
std::uint32_t item_id;
std::uint32_t item_weight;
};
float uniform_random(float begin, float end); // 负责等概率的生成[begin, end)的一个随机数
item_id random_select_1(const vector<item_weight_config>& weights)
{
assert(weights.size());
std::uint32_t total_weight = 0;
for(const auto& one_weight_config:weights)
{
total_weight += one_weight_config.item_weight;
}
std::uint32_t temp_random_value = std::uint32_t(std::floor(uniform_random(0, total_weight)));
for(const auto& one_weight_config: weights)
{
if(temp_random_value <one_weight_config.item_weight)
{
return one_weight_config.item_id;
}
temp_random_value -= one_weight_config.item_weight;
}
return weights.back().item_id;
}
这个算法简明易懂,就是执行过程中有两次遍历,如果一个配置会被用来结算多次,则可以使用预先计算total_weight来避免这个值的重复计算:
struct select_context
{
vector<item_weight_config> weights;
std::uint32_t total_weight;
void init()
{
total_weight = 0;
for(const auto& one_weight_config:weights)
{
total_weight += one_weight_config.item_weight;
}
}
};
item_id random_select_2(const select_context& ctx)
{
assert(ctx.total_weight);
std::uint32_t temp_random_value = std::uint32_t(std::floor(uniform_random(0, ctx.total_weight)));
for(const auto& one_weight_config: ctx.weights)
{
if(temp_random_value <one_weight_config.item_weight)
{
return one_weight_config.item_id;
}
temp_random_value -= one_weight_config.item_weight;
}
return ctx.weights.back().item_id;
}
这样就可以将两次遍历降低到一次遍历,所需执行时间降低到原来的一半。在这个优化基础上,我们还可以通过预先计算前缀和来避免遍历时对temp_random_value的更新,进一步降低遍历时的工作量:
struct select_context
{
vector<item_weight_config> weights;
vector<std::uint32_t> sum_weights;
void init()
{
sum_weights.resize(weights.size());
std::uint32_t total_weight = 0;
for(std::uint32_t i = 0; i< weights.size(); i++)
{
total_weight += weights[i].item_weight;
sum_weights[i] = total_weight;
}
}
};
item_id random_select_3(const select_context& ctx)
{
assert(ctx.sum_weights.size());
std::uint32_t temp_random_value = std::uint32_t(std::floor(uniform_random(0, ctx.sum_weights.back())));
for(std::uint32_t i = 0; i< ctx.sum_weights.size(); i++)
{
if(temp_random_value <ctx.sum_weights[i])
{
return ctx.weights[i].item_id;
}
}
return ctx.weights.back().item_id;
}
这样在选择时的循环遍历只需要做一次比较操作即可。由于权重永远是正数,所以sum_weights数组肯定是一个递增的序列,我们可以利用这个性质使用二分查找来找到第一个大于temp_random_value的元素序号,以替代顺序遍历:
item_id random_select_4(const select_context& ctx)
{
assert(ctx.sum_weights.size());
std::uint32_t temp_random_value = std::uint32_t(std::floor(uniform_random(0, ctx.sum_weights.back())));
auto temp_iter = std::upper_bound(ctx.sum_weights.begin(), ctx.sum_weights.end(), temp_random_value);
return ctx.weights[std::distance(ctx.sum_weights.begin(), temp_iter)].item_id;
}
在这样的搜索实现下,预处理之外的复杂度从数组大小N降低到了log(N),在N>10时就可有显著提升。
log(N)复杂度的算法已经是非常高效了,但是这个复杂度其实并不是加权随机选择问题最终的王者,还存在一种预处理之外只需要常数时间即可获得选择结果的最优算法。不过在介绍这个最优算法之前,我们需要先了解一下这个算法最初的一个效率不怎么高的版本。
假设随机的权重数组为[[0,6],[1,4], [2,1], [3,1]],即道具[0,1,2,3]各自的权重为[6,4,1,1],这四个道具的随机概率为[1/2, 1/3, 1/12, 1/12],其柱状图如下:

假设我们在上面图中的大矩形里随机选取一个点,所选点落入这四个柱形区域内的概率正比于这四个柱形的面积。由于是选点是均匀随机的,且柱形区域的底边长度一样高度等于随机概率,所以柱形区域的面积正比于其对应道具的随机概率。如果这个点在这四个带颜色的柱形区域内,则选择所在柱形对应的索引作为随机结果即可。但是上面图中右上角还有一大块区域是没有被柱形区域覆盖的,如果随机到的点在这个白色区域内,则需要再次随机,直到点落入到柱形区域内。采用这种随机选择方式,可以正确的依照权重来获取道具:
struct select_context
{
vector<item_weight_config> weights;
vector<float> relative_weights;
float p_max = 0;
void init()
{
relative_weights.resize(weights.size());
total_weight = 0;
for(std::uint32_t i = 0; i< weights.size(); i++)
{
total_weight += weights[i].item_weight;
}
for(std::uint32_t i = 0; i< weights.size(); i++)
{
relative_weights[i] = weights[i].item_weight * 1.0f/ total_weight;
p_max = std::max(p_max, relative_weights[i]);
}
}
};
item_id random_select_5(const select_context& ctx)
{
assert(ctx.weights.size());
while(true)
{
std::uint32_t random_index = std::uint32_t(std::floor((uniform_random(0, ctx.weights.size()))));
float random_weight = uniform_random(0, ctx.p_max);
if(random_weight <= ctx.relative_weights[random_index])
{
return ctx.weights[random_index].item_id;
}
}
}
在while循环内,单次循环体就可能得到最终的随机结果,这样成功的概率为所有柱形的面积除以最小包围矩形的面积,这个最小包围矩形的高度就是在init中计算的p_max,所以单次成功的概率为p=1/(n*p_max)。单次就成功只是理想情况,可能我们需要在while循环内执行多次才能获取随机选择结果,为了获取这个算法的复杂度我们需要计算while执行次数的期望。第k次才成功返回的概率为,所以总的次数期望为:
上面的后半部分可以转换为求,此时注意到:
将代入进入得到,所以最终的结果为:
所以期望执行的循环次数为n*p_max,这个的复杂度也是N的常数级别,比起前面介绍的二分查找慢了许多。
如果以某种机制来提高单次随机的成功概率p,则期望执行的循环次数会反比例的减小,当p被提升到1时,则只需要执行一次循环就可以获得随机选择的结果,接下来我们就来介绍如何通过巧妙的构造将p提升到1。
首先将前面的柱形分布图里所有的柱形高度都乘以n,这样会所有的柱形的面积累加值等于n,在这个缩放后的矩形里去随机选点等价于未缩放前:

然后再以y=1画一条线,这条线与底边x轴构成的矩形面积等于n:

由于所有柱形的面积也等于n,所以可以通过某些切割机制将原来的柱形切分为多个小柱形,然后再拼接为前面辅助线构造的矩形。上图中的褐色区域就是需要填充的空间,而辅助线y=1上方的那些彩色柱形区域就是要切割掉的空间。
我们首先从面积为2的那个柱形中切分出一个2/3面积的部分,然后将这部分填充到最右方的1/3上面,剩下的图形如下:

接下来继续从最左边剩下的4/3中切分出2/3给剩下的那个没有填充的1/3:

最后再从第二个柱形中切分出1/3给第一个柱形的剩下部分,这样就完成了柱形的切分再组装:

最后拼接出来的图形面积仍然是n,同时各个颜色的总面积等价于切分之前的总面积。但是此时已经不存在未被彩色柱形覆盖的区域了,所以在这个图形里随机选取一个点,计算出这个点对应的颜色,就可以获得随机选择的结果,不再需要多次抽样。就以上面的图来说,每个[k, k+1)区间内的小矩形数量最多为2,我们可以构造这样的一个数组来描述拼接的结构:
// 描述[k,k+1)的拼接信息
struct split_info
{
float self_prob = 0; // 每个小矩形的高度
uint32_t self_index = 0; // 每个小矩形的原始样本id
};
利用这个拼接数组,可以非常简单的单次随机即可计算出正确的结果:
uint32_t random_select_6(const std::vector<std::array<split_info, 2>>& rect_split_infos)
{
float temp_random_value = uniform_random(0, rect_split_infos.size());
uint32_t base_index = uint32_t(std::floor(temp_random_value));
float remain_random_value = temp_random_value - base_index;
if(remain_random_value <= rect_split_infos[base_index][0].self_prob)
{
return rect_split_infos[base_index][0].self_index;
}
else
{
return rect_split_infos[base_index][1].self_index;
}
}
上面的图里每个[k, k+1)区间内的小矩形数量最多为2目前来说只是一个特例,如果我们无法保证小矩形数量最多为2,上面的array<split_info, 2>要被替换为vector<split_info>,此时随机的实现就会变得不怎么高效了:
uint32_t random_select_7(const std::vector<std::vector<split_info>>& rect_split_infos)
{
float temp_random_value = uniform_random(0, rect_split_infos.size());
uint32_t base_index = uint32_t(std::floor(temp_random_value));
float remain_random_value = temp_random_value - base_index;
for(const auto& one_split_info: rect_split_infos[base_index])
{
if(remain_random_value <= one_split_info.self_prob)
{
return one_split_info.self_index;
}
remain_random_value -= one_split_info.self_prob;
}
}
这样的实现导致算法最坏执行时间被次级vector的最大容量所决定,对于这个最大容量我们唯一知道的信息是不会大于样本数量n,所以此时这个算法的复杂度仍然是样本数量n的线性复杂度。但是看前面介绍的简单样例只需要单次判断即可获得结果,如果有一种划分方法能保证最大容量为2的话,这个算法就只有常数时间复杂度,这样的最优情况非常吸引人。幸运的是我们可以证明存在这样的最大容量为2的划分,首先我们需要形式化的定义这个命题:
存在对于一个容量为n的vector<float> A, 如果accumulate(A.begin(), A.end(), 0) == A.size()且A中任意元素都大于0, 则存在一个std::vector<std::array<split_info, 2>> B,使得B.size()==n,且对于区间内[0, n)任意整数i,下面的条件都成立:
B[i][0].self_prob + B[i][1].self_prob == 1B[i][0].self_prob >=0 && B[i][1].self_prob >=0- 遍历
B中的每个元素C,再遍历C中的每个元素D,如果D.self_index==i,则累加D.self_prob,最后得到的累加值等于A[i]
这样的B就是我们所期望的最大容量为2的重整切割划分。接下来我们使用数学归纳法来证明对于上面的命题对于任意的正整数n都成立:
-
当样本数量为
1时,不需要执行切分合并,直接B[0][0].self_prob=1, B[0][0].self_index, B[0][1]={0,0}就可以满足需求,所以这个命题在n=1的时候成立。 -
假设对于样本数量为
k-1的A总是存在对应的B作为其最大容量为2的重整切割划分。当样本数量为k的时候,我们总是可以从A中选择两个索引i,j,使得A[i]>=A[j]且A[i]>=1>=A[j],此时从A[i]中切分一部分1-A[j]并填充到A[j]上,也就是B[j][0]={A[j],j}, B[j][1]={1-A[j], i},然后A[i]-=1-A[j], A[j]=1, swap(A[j], A.back()), swap(B[j], B.back())。此时我们将A[0],...A[k-2]构造为一个新的数组A2,将B[0],...B[k-2]构造出新的数组B2,这两个数组的大小为k-1。由于之前我们证明了对于n=k-1的时候总是存在一个B2是A2的重整切割划分,所以B2把里面的为j的self_index都重新赋值为k-1,然后再赋值到B[0],...B[k-2],此时的B也是对于原始A的最大容量为2的重整切割划分,即命题在n=k的时候也是成立。
上面的数学归纳法证明了对于任意的分布A我们总是可以构造一个最大容量为2的重整切割划分B,但是这里的构造算法不是很高效。每次缩减样本空间都需要扫描整个A数组来获取合符条件的两个索引i,j。下面我们来提出一种高效的算法来从A构造出B,这个构造方法可以避免每次扫描整个数组:
- 首先从
A数组构造两个新数组vector<pair<uint32_t, float>> C,D,C中存储原来高度大于1的所有元素的索引和高度,D中存储原来高度小于等于1的所有元素的索引和高度 - 每次从
D的末尾弹出一个元素E,- 如果
E.second==1,则代表此时不需要其他样本进行切分,执行B[E.first][0] = {E.first, E.second} - 如果
E.second<1,此时从C获取末尾元素F,从F中切除一部分1-E.second,使得E.first对应的区域被填充为1,以这个切分去构造B[E.first]
- 如果
- 判断
F.second是否小于等于1,如果小于等于1则将F从C的末尾转移到D的末尾 - 如果
D不为空,则回溯到步骤2再次执行;如果D为空,返回B作为结果。
下面的就是上述过程的cpp代码描述:
struct select_context
{
vector<item_weight_config> weights;
vector<std::array<split_info, 2>> rect_split_infos;
void init()
{
rect_split_infos.resize(weights.size());
std::vector<std::pair<uint32_t, float>> weights_gt_1;
std::vector<std::pair<uint32_t, float>> weights_le_1;
std::uint32_t total_weight = 0;
for(std::uint32_t i = 0; i< weights.size(); i++)
{
total_weight += weights[i].item_weight;
}
for(std::uint32_t i = 0; i< weights.size(); i++) // 对应步骤1
{
float relative_weight = weights.size() * weights[i].item_weight * 1.0f/ total_weight;
if(relative_weight > 1)
{
weights_gt_1.push_back(make_pair(i, relative_weight));
}
else
{
weights_le_1.push_back(make_pair(i, relative_weight));
}
}
while(!weights_le_1.empty()) // 对应步骤2
{
auto temp_le_1_back = weights_le_1.back();
weights_le_1.pop_back();
if(temp_le_1_back.second == 1) // 已经是1了 不需要再处理填充
{
rect_split_infos[temp_le_1_back.first][0].self_index = temp_le_1_back.first;
rect_split_infos[temp_le_1_back.first][0].self_prob = temp_le_1_back.second;
continue;
}
assert(!weights_gt_1.empty());
auto& temp_gt_1_back = weights_gt_1.back();
rect_split_infos[temp_le_1_back.first][0].self_index = temp_le_1_back.first;
rect_split_infos[temp_le_1_back.first][0].self_prob = temp_le_1_back.second;
rect_split_infos[temp_le_1_back.first][1].self_index = temp_gt_1_back.first;
rect_split_infos[temp_le_1_back.first][1].self_prob = 1- temp_le_1_back.second;
temp_gt_1_back.second -= 1 - temp_le_1_back.second;
if(temp_gt_1_back.second < 1) // 对应步骤3
{
weights_le_1.push_back(temp_gt_1_back);
weights_gt_1.pop_back();
}
}
}
};
上面的代码里有三个循环,每个循环的最大次数都被原始的输入数组大小限制住了,所以这个初始化流程相当于遍历了三次原始输入数组,其总时间复杂度为输入的线性复杂度。有了这个构造好的rect_split_infos之后,获取一个随机选择的值就很简单了:
uint32_t random_select_8(const select_context& ctx)
{
float temp_random_value = uniform_random(0, ctx.rect_split_infos.size());
uint32_t base_index = uint32_t(std::floor(temp_random_value));
float remain_random_value = temp_random_value - base_index;
if(remain_random_value <= ctx.rect_split_infos[base_index][0].self_prob)
{
return ctx.rect_split_infos[base_index][0].self_index;
}
else
{
return ctx.rect_split_infos[base_index][1].self_index;
}
}
上面介绍的方法也叫做Alias method,其更多的细节介绍可以参考https://www.keithschwarz.com/darts-dice-coins/。我自己也在Github上开源了一个简单的实现在https://github.com/huangfeidian/random_util/blob/main/include/choose_by_weight.h上,读者可以用这个基础代码作为参考实现。使用alias table算法就可以实现以常数时间来执行加权随机选择,所需的预处理时间也只是输入的线性时间。对于常见的十连抽一百连抽这种同一组配置会触发很多次随机选择的情况,这个算法是一个非常大的优化。
其实按照上面的代码流程构造出来的rect_split_infos可以保证rect_split_infos[i][0].self_index = i,可以利用这个性质来节省一点内存,不过这个性质对于算法的时间复杂度并没有任何改进,所以这里我就不去证明了,读者可以自己去证明这个性质。