远程消息投递

游戏服务器一般来说是由承担不同功能角色一组进程的集合,这些进程可以放在同一台物理机上,也可以分布式的存在于多台通过网络互联的物理机中。游戏服务器内管理着各种可以通信的对象,分布在这些进程之中。同时这些可通信的对象与进程之间的关系是动态的,可能会按照业务需求将某个对象从物理机器A中的某个进程切换到物理机器B中的某个进程。所以在游戏服务器中的对象间通信不能直接使用ip:port::target_id这种静态的配置形式,需要在业务层封装好一个基于对象名字的虚拟通道channel消息投递机制,以应对对象的进程间迁移。同时由于游戏内的玩家可能随时上线下线,所以对于玩家的某些消息还需要额外考虑其不在线的情况 ,使用持久化的数据库来确保玩家对这些持久化消息的有序接收。接下来我们将介绍一下mosaic_game中提供的面向不同应用场景的各种消息投递机制,以及相关应用组件。

在线消息投递

在线消息投递处理的是向目前在线的服务端entity,service发送rpc消息的过程。如果发送时目标不在线,则消息投递失败;如果在投递过程中目标下线,则消息投递失败。由于此类消息不保证能够可靠的通知到目标,所以此类消息一般都是用来执行客户端消息通知,不能依赖此类消息去修改entity的持久化数据。根据这个在线消息投递实现机制,又可以细分为三种子类型:

  1. 单播消息 只向一个目标进行消息投递
  2. 多播消息 向多个在线目标进行消息投递
  3. 广播消息 向所有在线目标进行消息投递

单播消息

在线单播消息投递是所有消息投递实现的基础,在mosaic_game中暴露出了如下接口来支持向指定的一个对象发送在线消息:

void json_server::call(std::shared_ptr<const std::string> dest, std::shared_ptr<const std::string> msg, enums::packet_cmd cur_packet_cmd, std::uint8_t cur_packet_detail_cmd)
{
	m_router->push_msg(m_local_name_ptr, dest, msg, enums::packet_cmd_helper::encode(cur_packet_cmd, cur_packet_detail_cmd));
}

dest字段代表投递目标的唯一标识符,为server_id::target_id形式的字符串,代表server_id服务器上的target_id对应的发送目标,后面的::target_id某些情况下可以省略。然后msg字段就是要发送的消息的字节流,这里使用shared_ptr<const std::string>是为了避免出现字符串的拷贝操作,同时也能更好的支持多播。

参数里剩下的cur_packet_cmdcur_packet_detail_cmd可以理解为消息类型和在这个消息下的消息子类型,编码时会将这两个字段合并为一个uint16进行处理:

enum class packet_cmd: std::uint8_t
{
	server_control = 0,
	client_to_game,
	game_to_client,
	server_rpc_msg,
	server_raw_msg,
	entity_msg,
	actor_migrate_msg,
	max,
};
struct packet_cmd_helper
{
	static std::uint16_t encode(packet_cmd in_packet_cmd, std::uint8_t in_cmd_detail)
	{
		std::uint16_t result = std::uint16_t(in_packet_cmd);
		result <<= 8;
		result += in_cmd_detail;
		return result;
	}

	static std::pair<packet_cmd, std::uint8_t> decode(std::uint16_t in_combine_cmd)
	{
		std::pair<packet_cmd, std::uint8_t> result;
		result.second = in_combine_cmd % 256;
		result.first = packet_cmd(in_combine_cmd / 256);
		return result;
	}
};

这个call接口只是对network_router::push_msg的一个简单转发,额外加上了当前发送者的进程标识符m_local_name_ptr,内部会根据这个dest找到合适的远程连接connection,并将消息添加到这个connection的发送队列中:

bool network_router::push_msg(std::shared_ptr<const std::string> from, std::shared_ptr<const std::string> dest, std::shared_ptr<const std::string> data, std::uint16_t cmd)
{

	auto cur_proxy_resource = m_anchor_collection.find_proxy_for_anchor(*dest);
	if(!cur_proxy_resource)
	{
		m_logger->error("push_msg cant find anchor_resources from {} dest {}  data {}", *from, *dest, *data);

		return false;
	}
	auto cur_proxy_con = cur_proxy_resource->get_connection();
	if (cur_proxy_con)
	{
		if (push_msg(cur_proxy_con, from, dest, data, cmd))
		{
			return true;
		}
	}
	return cur_proxy_resource->try_push(from, dest, data, cmd);
}

dest找到对应的connection的逻辑由anchor_collection类型负责,这个类型记录了anchor锚点到网络投递资源的映射,对外暴露了名字绑定接口来添加两者之间的映射:

bool network_router::link_anchor_to_connection(const std::string& anchor_name, const net_connection* connection)
{
	auto cur_connection_iter = m_connection_resources.find(connection);
	if (cur_connection_iter == m_connection_resources.end())
	{
		return false;
	}
	if(m_anchor_collection.create_resource(anchor_name, connection, cur_connection_iter->second->output_channel))
	{
		cur_connection_iter->second->anchors.insert(anchor_name);
		return true;
	}
	else
	{
		return false;
	}
}

查询的时候查找以::分割的最长前缀去匹配。举个例子来说,如果server1::target_id找不到记录的话,则继续以server_1去查找记录,这样就可以让server_1这个connection_resource去代理所有的server_1::xxx形式的rpc目标。

有些时候传入的投递地址可能会是本进程的地址,此时查找connection_resource的话会失败,因为当前并没有为本进程创建一个connection,从而导致投递消息失败,所以上层在投递消息的时候要专门为这个本地地址做过滤。这里的代码在发现dest是一个本进程地址之后,会将这个数据直接放到主循环消息队列中:

bool network_router::push_msg(std::shared_ptr<const std::string> from, std::shared_ptr<const std::string> dest, std::shared_ptr<const std::string> data, std::uint16_t cmd)
{
	if(dest->rfind(m_local_anchor_name, 0) == 0)
	{
		// 说明是本进程地址 直接推送数据到input_msg_queue
		network::con_msg_task local_msg_task;
		local_msg_task.first = {};
		local_msg_task.second = msg_task::construct(from, dest, data, cmd);
		m_input_msg_queue->push_msg(std::move(local_msg_task));
	}
	// 省略一些代码

}

找到connection之后的消息发送逻辑已经在之前的网络通信章节中介绍过了,读者可以回顾一下相关的内容来了解底层TCP传输细节,这里就不再介绍。当目标进程接收到了这个消息之后,主循环的on_msg回调首先解析出from,dest, msg, cmd这四个字段,然后根据cmd的类型与dest地址来做本进程的消息分发。由于具体的分发函数实现有点长,这里就只贴出space_server::on_msg中处理enums::packet_cmd::entity_msg的部分:

case enums::packet_cmd::entity_msg:
{
	if(!one_msg.dest)
	{
		m_logger->error("dest empty while handle rpc msg {}", *one_msg.data);
		return true;
	}
	
	utility::rpc_msg::call_result dispatch_result;

	auto real_dest = remove_local_anchor_prefix(*one_msg.dest);
	if(check_msg_forward(real_dest, one_msg))
	{
		return true;
	}
	dispatch_result = entity::entity_manager::instance().dispatch_entity_msg(real_dest, cur_cmd_detail.second, one_msg.data);
	if (dispatch_result != utility::rpc_msg::call_result::suc)
	{
		m_logger->error("fail to dispatch raw_msg dest {} sync_cmd {} with error {}",  *one_msg.dest, cur_cmd_detail.second, int(dispatch_result));
	}
	return true;
}

这里的remove_local_anchor_prefix相当于把server_id::entity_id形式的dest解析出entity_id部分,check_msg_forward的函数行为我们后面再介绍,剩下的逻辑就是entity_manager根据entity_id去找到对应的entity并调用on_entity_msg接口来处理本次数据:

utility::rpc_msg::call_result dispatch_entity_msg(const std::string& dest, std::uint8_t cmd, std::shared_ptr<const std::string> msg)
{
	auto cur_entity = get_entity(dest);
	if (!cur_entity)
	{
		return utility::rpc_msg::call_result::dest_not_found;
	}
	return cur_entity->on_entity_msg(cmd, msg);
}

这个on_entity_msg再根据消息的子类型来做格式解析,并最终执行到rpc的分发函数on_rpc_msg:

utility::rpc_msg::call_result server_entity::on_entity_msg(std::uint8_t cmd, std::shared_ptr<const std::string> msg_ptr)
{
	if(cmd == std::uint8_t(enums::entity_packet::json_rpc))
	{
		utility::rpc_msg e_msg;
		
		try
		{
			json msg = json::parse(*msg_ptr);
			msg.at("cmd").get_to(e_msg.cmd);
			msg.at("args").get_to(e_msg.args);
			auto from_iter = msg.find("from");
			if (from_iter != msg.end())
			{
				from_iter->get_to(e_msg.from);
			}
			auto err_iter = msg.find("err");
			if(err_iter != msg.end())
			{
				err_iter->get_to(e_msg.err);
			}
		}
		catch (std::exception& e)
		{
			m_logger->error("fail to decode rpc_msg {} error {}", *msg_ptr, e.what());
			return utility::rpc_msg::call_result::invalid_format;
		}
		return on_rpc_msg(e_msg);
	}
	
	return utility::rpc_msg::call_result::rpc_not_found;
}

on_rpc_msg之后的逻辑在rpc部分已经介绍过了,这里就不重复阐述。上面贴的代码对应的是space_server上的server_entity消息投递,其实与service_server上的base_service的消息投递机制基本没有差异,两者的on_rpc_msg机制是一样的,只不过一个被entity_manager中转分发,而另外一个被service_manager中转分发。类似的还有space_server上的manager_base,用来管理一些能够接受rpc的非server_entity单例数据:

utility::rpc_msg::call_result space_server::on_server_rpc_msg(const std::string& dest, const utility::rpc_msg& cur_rpc_msg)
{
	auto dispatch_result = entity::entity_manager::instance().dispatch_rpc_msg(dest, cur_rpc_msg);
	if(dispatch_result != utility::rpc_msg::call_result::dest_not_found)
	{
		return dispatch_result;
	}
	return manager_base::dispatch_rpc(dest, cur_rpc_msg);

}
void manager_base::init_managers(space_server* in_space_server)
{
	offline_msg_manager::instance().init(in_space_server);
	email_manager::instance().init(in_space_server);
	notify_manager::instance().init(in_space_server);
	rank_manager::instance().init(in_space_server);
	space_manager::instance().init(in_space_server);
}

不过serviceserver_entity在投递地址的表示上有很大的不同,server_entity的投递地址都是server_id::entity_id这个形式,而service的投递地址就只有service_type这个形式,没有绑定的server_id。这个解绑server_id的原因是为了支持service在不同的进程之间动态迁移从而实现负载均衡以及容灾,所以往特定service发消息的时候,只需要传递这个服务的名字即可作为投递地址:

void space_server::call_service( const std::string& service_name, const utility::rpc_msg& msg)
{
	auto dest_server = choose_server_for_service(service_name);
	if(dest_server.empty())
	{
		m_logger->warn("fail to find server for service {} msg {}", service_name,  json(msg).dump());
		return;
	}
	call_server(service_name, msg);
}

在一个service被创建的时候,每个进程都会接收到对应的创建通知,内部包含了所在的服务器信息,进程接收到这个消息之后会记录服务与进程之间的关联信息到m_services_by_id这个map成员上:

void space_server::on_notify_service_created(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> from, const json& msg)
{
	std::string service_type;
	std::string service_id;
	std::string service_server;
	try
	{
		msg.at("service_id").get_to(service_id);
		msg.at("service_type").get_to(service_type);
		msg.at("service_server").get_to(service_server);
	}
	catch(const std::exception& e)
	{
		m_logger->error("on_notify_service_created fail to parse msg {} error {}", msg.dump(), e.what());
		return;
	}
	m_services_by_id[service_id] = std::make_pair(service_type, service_server);
}

注意这里的key不是service_type而是一个根据规则生成的service唯一id, 这样做的理由是为了支持同一个service_type创建多个实例来做负载均衡。当需要给一个service发消息的时候,会查询是否有这个服务以及是否已经绑定了进程,这个绑定信息记录在m_services_by_pref上,如果没有绑定则遍历m_services_by_id里这个service_type对应的所有实例,随机选择其中的一个来执行绑定:

std::string space_server::choose_server_for_service(const std::string& service_name)
{
	auto temp_iter_1 = m_services_by_pref.find(service_name);
	if(temp_iter_1 != m_services_by_pref.end())
	{
		return temp_iter_1->second.second;
	}
	std::vector<std::pair<std::string, std::string>> temp_services;
	temp_services.reserve(8);
	for(const auto& one_pair: m_services_by_id)
	{
		if(one_pair.second.first != service_name)
		{
			continue;
		}
		temp_services.push_back(std::make_pair(one_pair.first, one_pair.second.second));
	}
	if(temp_services.empty())
	{
		return {};
	}
	auto cur_ms = utility::timer_manager::now_ts();
	auto cur_result = temp_services[cur_ms % temp_services.size()];
	m_services_by_pref[service_name] = cur_result;
	m_router->link_anchor_to_connection(service_name, cur_result.second);
	m_logger->info("set server {}  service_id {} to the pref server of service_type {}", cur_result.second, cur_result.first, service_name);
	return cur_result.second;

}

绑定之后就会在network_router中使用link_anchor_to_connection来注册投递地址与网络连接之间的映射,这里使用绑定而不是每次发送消息时都随机从temp_service中抽取一个的理由是保证从一个进程发送到一个投递地址的消息的接收顺序与发送顺序一致。

server_entity对于动态迁移的需求比service大很多,因为可迁移的server_entity数量相对service数量来说大好几个数量级。同时server_entity的迁移触发的比service更加频繁,每次切换场景时都可能触发,而service的迁移时机只有动态扩缩容和容灾。所以对于可迁移的server_entity的消息投递不能采取与service一样的广播推送最新地址的机制,这个机制会导致需要同步的信息量太多。所以针对server_entity的高频迁移特性,其在线消息投递使用的是一种基于中转的消息投递机制,对于这些可迁移的server_entity,在创建的时候就会在所在进程创建一个专门为其做消息转发服务的relay_entity:

player_entity* account_entity::create_player_entity(const std::string& player_id, const json& player_doc)
{
	std::string cur_err;
	auto cur_relay_entity_id = std::to_string(get_server()->gen_unique_uint64());
	json::object_t relay_init_info;
	relay_init_info["dest_eid"] = player_id;
	relay_init_info["dest_game"] = get_server()->local_server_info().name;
	auto cur_relay_entity = get_server()->create_entity("relay_entity", cur_relay_entity_id, gen_online_entity_id(),relay_init_info, cur_err);
	if(!cur_relay_entity)
	{
		m_logger->error("fail to create relay_entity");
		return nullptr;
	}
	m_relay_entity = dynamic_cast<relay_entity*>(cur_relay_entity);
	json::object_t player_init_info;
	player_init_info["prop"] = player_doc;
	player_init_info["prop"]["base"]["account_anchor"] = *get_call_proxy();
	player_init_info["prop"]["base"]["gate_name"] = m_gate_name;
	player_init_info["is_ghost"] = false;
	player_init_info["space_id"] = std::string();
	player_init_info["call_proxy"] = *cur_relay_entity->get_call_proxy();
	auto cur_entity = get_server()->create_entity("player_entity", player_id, gen_online_entity_id(),player_init_info, cur_err);
}

在创建好了relay_entity之后再创建所需的server_entity,同时将relay_entity的消息投递地址绑定在server_entitycall_proxy上。由于relay_entity是不可迁移的,所以使用call_proxy发送消息可以保证单进程有序与这个relay_entity进行消息通信。这个relay_entity会记录对应的server_entity的最新进程地址,server_entity每次迁移之前都需要通知relay_entity进行进程地址清空,等到relay_entity返回了迁移确认之后这个server_entity才能开始真正的迁移。在server_entity迁移成功之后,再将最新绑定的进程信息发送给relay_entity进行重新绑定。

void relay_entity::request_migrate_begin(const utility::rpc_msg& msg, const std::string& game_id, const std::string& space_id, const std::string& union_space_id, const json::object_t& enter_info)
{
	if(!m_dest_actor)
	{
		m_logger->error("request_migrate_begin while dest_anchor empty dest_game {} dest_eid {}", m_dest_game, m_dest_eid);
		return;
	}
	utility::rpc_msg reply_msg;
	reply_msg.cmd = "reply_migrate_begin";
	reply_msg.args.push_back(game_id);
	reply_msg.args.push_back(space_id);
	reply_msg.args.push_back(union_space_id);
	reply_msg.args.push_back(enter_info);
	call_server(m_dest_actor, reply_msg);
	m_dest_actor.reset();
	m_dest_game = game_id;
	
}

void relay_entity::notify_migrate_finish(const utility::rpc_msg& msg, const std::string& game_id)
{
	if(m_dest_game != game_id)
	{
		m_logger->error("notify_migrate_finish while  game not match  empty dest_game {} dest_eid {} new_game_id {}", m_dest_game, m_dest_eid, game_id);
		return;
	}
	m_dest_actor = std::make_shared<std::string>(utility::rpc_anchor::concat(m_dest_game, m_dest_eid));
	if(m_dest_game == get_local_server_name())
	{
		auto dest_entity = entity_manager::instance().get_entity(m_dest_eid);
		if(dest_entity)
		{
			for(const auto& one_msg:m_cached_msgs)
			{
				auto cur_cmd_detail = enums::packet_cmd_helper::decode(one_msg.cmd);
				auto dispatch_result = dest_entity->on_entity_msg(cur_cmd_detail.second, one_msg.data);
				if(dispatch_result != utility::rpc_msg::call_result::suc)
				{
					m_logger->error("entity {} fail to dispatch cmd {} data {} err {}", m_dest_eid, cur_cmd_detail.second, *one_msg.data, std::uint8_t(dispatch_result));
				}
			}
			m_cached_msgs.clear();
		}
	}
	for(const auto& one_msg: m_cached_msgs)
	{
		auto cur_cmd_detail = enums::packet_cmd_helper::decode(one_msg.cmd);
		call_server(m_dest_actor, one_msg.data, cur_cmd_detail.first, cur_cmd_detail.second);
	}
	
	m_cached_msgs.clear();
}

relay_entity接收到一个转发请求之后,先检查对应server_entity的进程绑定信息,如果没有绑定则先放到m_cached_msgs队列中,否则直接向对应的绑定进程发消息:

void relay_entity::forward(const network::msg_task& cur_msg_task)
{
	if(!m_dest_actor)
	{
		m_cached_msgs.push_back(cur_msg_task);
	}
	else
	{
		auto cur_cmd_detail = enums::packet_cmd_helper::decode(cur_msg_task.cmd);
		if(m_dest_game == get_local_server_name())
		{
			auto dest_entity = entity_manager::instance().get_entity(m_dest_eid);
			if(dest_entity)
			{
				auto dispatch_result = dest_entity->on_entity_msg(cur_cmd_detail.second, cur_msg_task.data);
				if(dispatch_result != utility::rpc_msg::call_result::suc)
				{
					m_logger->error("fail to foward dispatch cmd {} data {} with result {}", cur_cmd_detail.second, *cur_msg_task.data, std::uint8_t(dispatch_result));
				}
			}
			else
			{
				m_logger->error("fail to find dest entity {}", m_dest_eid);
			}
		}
		else
		{
			call_server(m_dest_actor, cur_msg_task.data, cur_cmd_detail.first, cur_cmd_detail.second);
		}
		
	}
}

当重新绑定之后,还需要将m_cached_msgs全都按照接收顺序发送一遍。这样通过一个不可迁移的relay_entity我们就可以实现对可迁移server_entity的稳定有序消息投递了。

上面的relay_entity机制有一个小问题,就是消息发送方需要知道目标server_entitycall_proxy,如果无法获得这个call_proxy,则无法直接使用relay_entity机制。因此这里对于player_entity还有一个另外的在线通知机制,全局有一个login_service会记录所有在线玩家的account_entity地址,而account_entity也是一个不迁移的server_entity,可以保证其与relay_entity在同一个进程上。所以login_service上提供了一个request_call_online接口来执行向指定的player_id发送一个在线消息:

void login_service::request_call_online(const utility::rpc_msg& msg, const std::string& cur_player_id, const std::string& cmd, const std::vector<json>& args)
{
	auto cur_iter = m_online_players.find(cur_player_id);
	if(cur_iter == m_online_players.end())
	{
		m_logger->info("request_call_online not online fail to call {} cmd {} args {}", cur_player_id, cmd, json(args).dump());
		return;
	}
	utility::rpc_msg result_msg;
	result_msg.cmd = cmd;
	result_msg.args = args;
	auto cur_server = get_server();
	cur_server->call_server(cur_iter->second, result_msg);
}

account_entity接收到这个消息之后,会调用call_player来手动获取这个同进程的relay_entity并执行转发:

void account_entity::call_player(const utility::rpc_msg& msg)
{
	if(!m_relay_entity)
	{
		return;
	}
	
	m_relay_entity->forward(msg);
}

这样在只拥有目标玩家的entity_id的情况下,可以通过login_service中转到account_entity,再中转到relay_entity,最终中转到player_entity。这样做会涉及到三次网络消息的收发,相对于基于call_proxy的中转来说多了一次,所以最好还是能够以call_proxy的方式去执行消息通知。因此很多service上都会有一个map记录当前所有在线玩家的call_proxy,玩家登录完成之后会向这些service推送自己的call_proxy,下面的就是玩家的聊天组件向聊天服务推送在线状态的代码,维护在线玩家call_proxy的相关代码:

void player_chat_component::on_login(bool is_relay)
{
	if(is_relay)
	{
		return;
	}
	utility::rpc_msg notify_msg;
	notify_msg.cmd = "notify_player_login";
	notify_msg.set_args(m_owner->entity_id(), *m_owner->get_call_proxy());
	m_owner->call_service("chat_service", notify_msg);
	
}

void player_chat_component::on_logout()
{
	utility::rpc_msg notify_msg;
	notify_msg.cmd = "notify_player_logout";
	notify_msg.set_args(m_owner->entity_id());
	m_owner->call_service("chat_service", notify_msg);
	
}

在聊天服务上使用一个unordered_map来记录注册过来的在线call_proxy, 同时提供一个封装好的call_online_player来处理向在线玩家发消息的需求:

void chat_service::notify_player_login(const utility::rpc_msg& msg, const std::string& player_id, const std::string& call_proxy)
{
	m_online_players[player_id] = std::make_shared<const std::string>(call_proxy);
}

void chat_service::notify_player_logout(const utility::rpc_msg& msg, const std::string& player_id)
{
	m_online_players.erase(player_id);
}
void chat_service::call_online_player(const std::string& player_id, const std::string& cmd, const std::vector<json>& args)
{
	auto cur_iter = m_online_players.find(player_id);
	if(cur_iter == m_online_players.end())
	{
		return;
	}
	utility::rpc_msg cur_msg;
	cur_msg.cmd = cmd;
	cur_msg.args = args;
	auto cur_server = get_server();
	cur_server->call_server(cur_iter->second, cur_msg);
}

就这样每个有通知在线玩家需求的service都自己维护了在线玩家的call_proxy记录,这样就可以避免都通过login_service去执行消息通知,降低login_service的单点压力。

多播消息

多播消息这种一般都是社群系统会使用到,例如群组、队伍、帮派等社群结构,多播系统的最简实现可以就只需要三行代码:使用一个for循环来遍历投递目标的集合然后调用单播消息投递接口。不过这样的实现会导致发送的消息rpc_msg被重复打包多次,浪费很多cpu,因此在server上提供了一个避免了重复消息打包的优化版本:

void space_server::call_server_multi(const entity::server_entity* cur_entity, const utility::rpc_msg& msg, const std::vector<std::string>& targets)
{
	auto shared_msg = msg.to_bytes();
	call_server_multi(cur_entity, shared_msg, enums::entity_packet::json_rpc, targets);
	
}

void space_server::call_server_multi(const entity::server_entity* cur_entity, std::shared_ptr<const std::string> msg, enums::entity_packet cur_entity_packet, const std::vector<std::string>& targets)
{
	for (auto one_dest : targets)
	{
		call_server(cur_entity, std::make_shared<std::string>(one_dest), msg, cur_entity_packet);
	}
}

这里使用rpc_msg上的to_bytes接口来预先执行消息打包序列化,生成一个shared_ptr<string>,这样就只需要打包一次,同时这个打包好的数据的生命周期能够被引用计数自动托管,业务层就不需要考虑msg生命周期的具体细节了。

不过这里的for循环也有一个可以性能优化的点,循环体内部会创建发送目标的一个shared_ptr<string>,其实更优的结果是创建一个包含了这个targets数组里所有元素的一个大的shared_ptr<string>,然后push_msg发送的时候使用下面的一个结构来编码dest:

struct shared_string_view
{
	std::shared_ptr<std::string> parent;
	std::string_view str;
};

这样make_shared只需要执行一次就行了,避免了多次动态内存分配相关的cpu损耗,同时整体的生命周期都被shared_ptr托管了,RAII会自动处理这个parent的资源释放。

有了这个call_server_multi接口之后,外部的多播接口只需要中转一下就好了:

void group_service::group_broadcast(const group_resource* cur_group, const std::string& cmd, const std::vector<json>& args, const std::string& except_id, bool without_leader)
{
	const auto& group_anchors = cur_group->get_online(except_id, without_leader);
	utility::rpc_msg cur_msg;
	cur_msg.cmd = cmd;
	cur_msg.args = args;
	get_server()->call_server_multi(cur_msg, group_anchors);
}

void team_service::team_broadcast(const team_resource* cur_team, const std::string& cmd, const std::vector<json>& args, const std::string& except_id)
{
	std::vector<std::string> team_anchors;
	team_anchors.reserve(cur_team->m_prop.m_players.index().size());
	for(const auto& one_idx: cur_team->m_prop.m_players.index())
	{
		auto cur_player_ptr = cur_team->m_prop.m_players.get_slot(one_idx.second);
		if(cur_player_ptr->m_id == except_id)
		{
			continue;
		}
		team_anchors.push_back(cur_player_ptr->anchor());
	}
	utility::rpc_msg cur_msg;
	cur_msg.cmd = cmd;
	cur_msg.args = args;
	get_server()->call_server_multi(cur_msg, team_anchors);
}

广播消息

游戏内有些逻辑需要往所有的在线玩家发送消息,这个需求虽然类似于多播,但是使用login_service去遍历所有玩家去调用多播接口的话消息流量会爆炸,而且会导致这个单点的卡顿,广播频率高的话可能会影响登录与下线。此时我们注意到这样的暴力广播流程里,很多时候数据的流向是login_service->game_server::relay_entity->game_server::player_entity->gate_server->client,中间的两层game_server不负责任何逻辑,完全执行数据转发任务。在这种情况下可以考虑跳过game_server相关的两层,直接把要广播的数据直接发向gate_server,然后让gate_server去遍历当前进程上绑定的所有客户端来执行在线通知。在这样的优化设计下,数据链路缩短为login_service->gate_server->client,减少了两层中转,同时数据流量也变得可控了,之前需要发送的数据份数为在线玩家的数量,而现在则缩减为gate_server的数量,这样内网流量放大倍率从几万的量级降低到了几十,同时还极大的降低了login_service的负载。

实际项目使用过程中,广播消息不仅仅只有全服广播,还有门派广播、势力广播、场景广播等多种类型。这些类型虽然不需要给所有在线客户端发送通知消息,但是其通知的人员范围依然是一个成百上千的数量,利用之前的多播接口依然会有性能问题。所以对于这些广播操作,采取先发送给所有的gate_server然后再由gate_server进行过滤后广播的方式依然可以节省很多的内网流量,同样可以避免这个广播接口的负载。在考虑了这些自定义的广播组需求之后,我们需要在gate_server上增加一个unordered_map来记录每个广播组下面的人员:

// 每个群组对应的client
// 这里的key string_view 对应的str就是value里的shared_ptr 
std::unordered_map<std::string, std::unordered_map<std::string_view, std::shared_ptr<const std::string>>> m_clients_for_group;

玩家角色可以通过下面的两个接口来维护自己与group之间的关系:

void space_server::update_player_broadcast_group(const entity::player_entity* cur_player, const std::vector<std::string>& groups)
{
	auto cur_account_id = *cur_player->shared_account_id();
	json params;
	params["entity_id"] = cur_account_id;
	params["groups"] = groups;
	json msg;
	msg["cmd"] = "request_link_group";
	msg["param"] = params;
	m_router->push_msg(cur_player->get_gate_id(), m_local_name_ptr, {}, std::make_shared<std::string>(msg.dump()), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
}

void space_server::clear_player_broadcast_group(const entity::player_entity* cur_player, const std::vector<std::string>& groups)
{
	auto cur_account_id = *cur_player->shared_account_id();
	json params;
	params["entity_id"] = cur_account_id;
	params["groups"] = groups;
	json msg;
	msg["cmd"] = "request_unlink_group";
	msg["param"] = params;
	m_router->push_msg(cur_player->get_gate_id(), m_local_name_ptr, {}, std::make_shared<std::string>(msg.dump()), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
}

gate_server上提供这两个接口来处理上面的两条消息通知,维护内部的m_clients_for_group:

void on_request_link_group(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> dest, const json& msg);

void on_request_unlink_group(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> dest, const json& msg);

同时gate_server上暴露一个广播接口on_request_broadcast_groups,方便space_server来调用,这个接口还支持给多个group一起广播:

void gate_server::on_request_broadcast_groups(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> dest, const json& msg)
{
	std::vector<std::string> groups;
	std::string msg_detail;
	std::uint8_t msg_cmd;
	try
	{
		msg.at("groups").get_to(groups);
		msg.at("msg_detail").get_to(msg_detail);
		msg.at("msg_cmd").get_to(msg_cmd);
	}
	catch(const std::exception& e)
	{
		m_logger->error("on_request_broadcast_groups fail to parse {} with error {}", msg.dump(), e.what());
		return;
	}
	auto cur_shared_rpc_msg = std::make_shared<std::string>(std::move(msg_detail));
	for(const auto& group: groups)
	{
		auto temp_iter = m_clients_for_group.find(group);
		if(temp_iter == m_clients_for_group.end())
		{
			continue;;
		}
		for(const auto& one_entity_pair: temp_iter->second)
		{
			m_router->push_msg({}, one_entity_pair.second, cur_shared_rpc_msg, enums::packet_cmd_helper::encode(enums::packet_cmd::game_to_client, msg_cmd));
		}
	}
	
}

space_server上提供了广播消息的入口:

void space_server::send_msg_to_broadcast_groups(const std::vector<std::string>& groups, std::shared_ptr<const std::string>& rpc_msg)
{
	json params;
	params["groups"] = groups;
	params["msg_detail"] = *rpc_msg;
	params["msg_cmd"] = std::uint8_t(enums::entity_packet::json_rpc);
	json msg;
	msg["cmd"] = "request_broadcast_groups";
	msg["param"] = params;
	auto cur_shared_msg = std::make_shared<std::string>(msg.dump());
	for(const auto& one_gate_info: m_gate_entities)
	{
		m_router->push_msg(one_gate_info.first, m_local_name_ptr, {}, std::make_shared<std::string>(msg.dump()), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
	}

}

而调用这个接口的目前只有专门处理广播的notify_manager:

void notify_manager::notify_broadcast_groups(const std::vector<std::string>& groups, const utility::rpc_msg& msg)
{
	auto cur_cmd = msg.cmd;
	std::shared_ptr<const std::string> shared_msg = msg.to_bytes();
	utility::rpc_msg stat_msg;
	stat_msg.cmd = "add_broadcast_group_stat";
	stat_msg.set_args(groups, cur_cmd, std::uint32_t(shared_msg->size()));
	m_server->call_service("notify_service", stat_msg);
	m_server->send_msg_to_broadcast_groups(groups, shared_msg);

}

目前的mosaic_game里只有全服聊天和门派聊天使用了notify_manager提供的这个接口来做广播:

void player_notify_component::send_msg_to_broadcast_groups(const std::vector<std::string>& groups, const utility::rpc_msg& msg)
{
	server::notify_manager::instance().notify_broadcast_groups(groups, msg);
}

void player_notify_component::send_msg_to_broadcast_group(const std::string& group, const utility::rpc_msg& msg)
{
	m_player->prop_proxy().notify().broadcast_group_send_ts().insert(group, utility::timer_manager::now_ts());
	std::vector<std::string> cur_groups;
	cur_groups.push_back(group);
	server::notify_manager::instance().notify_broadcast_groups(cur_groups, msg);
}

虽然广播给所有客户端占在线广播消息的绝大部分,但是还有一些情况广播数据并不传递到客户端,而是通知到player_entity这一层去执行一些逻辑,此时的数据传递就不能用前述的gate广播组来做。不过这里我们也可以使用类似的设计,将数据发送到所有的game_server,然后由game_server去遍历当前进程上的所有玩家执行广播消息通知。这部分的功能通过notify_service来提供:

void notify_service::notify_all_online(const utility::rpc_msg& msg, const std::string& cmd, const json::object_t& detail)
{
	db_logic::notify_msg new_msg;
	new_msg.msg.cmd = cmd;
	new_msg.msg.args = args;
	new_msg.doc_seq = m_online_msgs.size();
	m_online_msgs.push_back(new_msg);
	if(m_delay_broadcast_timer.valid())
	{
		return;
	}
	utility::rpc_msg cur_broadcast_msg;
	cur_broadcast_msg.cmd = "sync_add_online_msg";
	cur_broadcast_msg.set_args(new_msg);
	get_server()->call_server_multi(cur_broadcast_msg, m_broadcast_managers);
}

这个接口会将广播数据通知到所有注册过来的notify_manager上,由于在每个space_server上都会创建一个notify_manager,所以等价于把这个消息广播到了space_server。当space_server上的notify_manager接收到这个请求之后,会遍历当前进程上的所有玩家来通知去查收最新的全服广播消息:

void notify_manager::sync_add_online_msg(const utility::rpc_msg& data, const json& new_online_msg)
{
	m_logger->info("sync_add_online_msg {}", new_online_msg.dump());
	db_logic::notify_msg temp_msg;
	try
	{
		new_online_msg.get_to(temp_msg);
	}
	catch(const std::exception& e)
	{
		m_logger->error("sync_add_online_msg fail for msg {}", new_online_msg.dump());
		return;
	}
	m_online_msgs.push_back(temp_msg);

	auto cur_players = entity::entity_manager::instance().get_entities_by_exact_type<
		entity::player_entity>();
	for(auto one_player: cur_players)
	{
		one_player->dispatcher().dispatch(enums::event_category::notify, std::string("online"));
	}
}

这里的获取所有在线玩家的实现是一个可以优化的点,内部会使用dynamic_cast来将server_entity转换到player_entity,如果这个有比较明显的性能瓶颈的话,推荐在notify_manager上维护好一个单独的在线player_entity的集合,这样就可以避免每次都执行这个get_entities操作。

玩家身上的player_notify_component注册了这个广播数据的接收,并重定向到rpc的处理:

void player_notify_component::event_listener(const utility::enum_type_value_pair& ev_cat, const std::string& detail)
{
	if(ev_cat != utility::enum_type_value_pair(enums::event_category::notify))
	{
		return;
	}
	m_owner->logger()->info("player_notify_component event_listener event {} value {} detail {}", ev_cat.enum_type, ev_cat.enum_value, detail);
	if(detail == "all")
	{
		handle_online_msgs();
		handle_db_msgs();
	}
	else if(detail == "online")
	{
		handle_online_msgs();
	}
	else if(detail == "db")
	{
		handle_db_msgs();
	}
}

void player_notify_component::handle_online_msgs()
{
	const auto& online_msgs = server::notify_manager::instance().online_msgs();
	if(online_msgs.size() <= m_player->prop_data().notify().online_seq_read())
	{
		return;
	}
	for(auto i = m_player->prop_data().notify().online_seq_read(); i< online_msgs.size(); i++)
	{
		on_new_notify(online_msgs[i], true);
	}
}

注意到这里我们还给这个消息添加了一个唯一递增序列号,玩家自身也记录一个不存库的属性来表明自身已经读取到哪一个在线信息了,玩家每处理一个在线信息就对这个属性进行更新:

void player_notify_component::on_new_notify(const db_logic::notify_msg& new_msg, bool is_online)
{
	m_owner->logger()->info("on_new_notify {}", json(new_msg).dump());
	if(is_online)
	{
		if(m_player->prop_proxy().notify().online_seq_read().get() < new_msg.doc_seq)
		{
			m_player->prop_proxy().notify().online_seq_read().set(new_msg.doc_seq);
			m_player->on_rpc_msg(new_msg.msg);
		}
		else
		{
			m_owner->logger()->error("duplicated on_new_notify {}", json(new_msg).dump());
		}
	}
	else
	{
		if(m_player->prop_proxy().notify().db_seq_read().get() < new_msg.doc_seq)
		{
			m_player->prop_proxy().notify().db_seq_read().set(new_msg.doc_seq);
			m_player->on_rpc_msg(new_msg.msg);
		}
		else
		{
			m_owner->logger()->error("duplicated on_new_notify {}", json(new_msg).dump());
		}
	}
}

这里的序列号主要是为了避免广播消息在同一个player_entity上重复执行,下面就是一种可能出现重复处理消息通知的时间线

T1 notify_service::notify_all_online msg_1
T2 space_server_1::notify_manager::sync_add_online_msg msg_1
T3 space_server_1::player_A::on_new_notify msg_1
T4 space_server_1::player_A::migrate_out
T5 space_server_2::player_A::migrate_in
T6 space_server_2::notify_manager::sync_add_online_msg msg_1

player_Aspace_server_1处理了一条广播消息msg_1之后发生了迁移,迁移到了space_server_2之后,这个msg_1才传递到space_server_2::notify_manager上,此时如果不判断消息的序列号的话,就会出现消息的重复通知。

除了重复通知之外,还可能出现消息的通知丢失问题,主要出现在玩家的迁移过程中。此时玩家的entity被销毁,其数据正在网络中中转,任意一个space_server都没有其对应的entity,因此消息会通知不到。为了解决这个问题,玩家在迁移结束之后会重新拉取一下notify_manager里存储的消息,判断是否需要处理:

bool player_notify_component::init(const json& data)
{
	m_player = dynamic_cast<player_entity*>(m_owner);
	if(!m_player)
	{
		return false;
	}
	m_owner->dispatcher().add_listener(enums::event_category::notify, &player_notify_component::event_listener, this);
	m_player->login_dispatcher().add_listener(&player_notify_component::on_login, this);
	m_player->logout_dispatcher().add_listener(&player_notify_component::on_logout, this);
	// 下面这行负责处理迁移完成之后的消息拉取
	m_owner->migrate_in_finish_dispatcher().add_listener(
		&player_notify_component::on_migrate_in_finish, this);
	return true;
}

void player_notify_component::on_migrate_in_finish()
{
	handle_db_msgs();
	handle_online_msgs();
}

离线消息投递

本章前面的内容介绍的都是如何向一个或多个在线的客户端发送消息,其核心在于在线,如果玩家不在线或者中途断线再上线,这些消息他就接收不到。所以这类在线消息通知只能用于一些提示性的业务,消息的接收与否不能影响服务端的逻辑正确性。如果我们需要向一个或多个玩家发送一些保证能接收到的消息,前述的机制就无法使用了,必须引入一种依赖于数据库的消息接收确认机制,来保证这个消息在玩家上线后能够及时的处理。

单播消息

跟在线消息投递一样,离线单播消息也是离线消息投递的基础。确保消息被接收我们可以仿照TCPACK机制,每个玩家的离线消息地址当作一个先进先出的队列,这个地址里接收到的数据会按照到达序分配一个递增流水号,同时将这条数据进行存库。然后玩家自己身在线后定期从离线消息队列中拉取头部的若干个消息进行处理,并删除已经处理完成的数据。

mosaic_game中也的确是这样实现的,在数据库中创建了一个单独的表OfflineMsg来存储所有玩家的离线消息通知,这个库里的每条消息都有一个entity_id字段代表对应的玩家id,同时有一个doc_seq字段代表这条消息的唯一序列号:

"OfflineMsg": [
	[
		[["entity_id", 1]],
		{

		}
	],
	[
		[["entity_id", 1], ["doc_seq", 1]],
		{
			"unique": true
		}
	]
]

这个数据库的操作都被封装到了entity_db_msg_manager_base中,在cpp中定义了消息的完整格式:

struct entity_db_msg
{
	std::string entity_id;
	json::object_t detail;
	std::uint64_t doc_seq;
	std::uint64_t ts;
	NLOHMANN_DEFINE_TYPE_INTRUSIVE(entity_db_msg, entity_id, detail, doc_seq, ts)
};

对于每个entity_id, 其doc_seq0的行会作为一个元数据行,里面有一个额外的字段used_seq代表这个玩家的离线消息里使用的最大流水号,每次向这个玩家发送离线消息的时候,需要先查询这条doc_seq==0的数据里记录的used_seq:

void entity_db_msg_manager_base::add_msg(const std::string& entity_id,const json::object_t& detail)
{
	std::shared_ptr<entity_db_msg> cur_entity_db_msg = std::make_shared<entity_db_msg>();
	cur_entity_db_msg->entity_id = entity_id;
	cur_entity_db_msg->detail = detail;
	cur_entity_db_msg->doc_seq = 0;
	cur_entity_db_msg->ts = utility::timer_manager::now_ts();
	auto cur_db_callback = [cur_entity_db_msg, this](const json& db_reply)
	{
		on_query_seq_back(cur_entity_db_msg, db_reply);
	};
	tasks::db_task_desc::base_task cur_task_base(tasks::db_task_desc::task_op::modify_update, std::string{}, "", collection_name());
	json query_doc, db_doc;
	query_doc["entity_id"] = meta_doc_id(entity_id);
	query_doc["doc_seq"] = 0;

	db_doc["$inc"]["used_seq"] = 1;
	auto cur_modify_task = tasks::db_task_desc::modify_task::modify_one(cur_task_base, query_doc, db_doc, true, true);
	run_db_task(cur_modify_task, cur_db_callback);
}

由于这个玩家的doc_seq==0的行可能还没有创建,所以查询的时候使用的接口是modify_one,代表如果没有的话就以默认值来创建。在这个db操作的回调on_query_seq_back,使用最新递增之后作为这条消息的序列号,最后才能插入到数据库中:

cur_entity_db_msg->doc_seq = used_seq;

auto cur_db_callbak = [cur_entity_db_msg, this](const json& db_reply)
{
	on_add_msg_back(cur_entity_db_msg, db_reply);
};
tasks::db_task_desc::base_task cur_task_base(tasks::db_task_desc::task_op::insert_one, std::string{}, "", collection_name());
json db_doc = *cur_entity_db_msg;

auto cur_insert_task = tasks::db_task_desc::insert_task::insert_one(cur_task_base, db_doc);
run_db_task(cur_insert_task, cur_db_callbak);

插入完成之后,还需要执行一个通知操作,这样如果这个玩家在线的话就会去立即拉取数据库中存好的通知信息:

void entity_db_msg_manager_base::on_add_msg_back(std::shared_ptr<entity_db_msg> cur_entity_db_msg, const json& db_reply)
{
	m_logger->info("on_add_msg_back for msg {} db_reply {}", json(*cur_entity_db_msg).dump(), db_reply.dump());
	notify_pull_msg(cur_entity_db_msg);
	
}

void offline_msg_manager::notify_pull_msg(std::shared_ptr<db_logic::entity_db_msg> cur_msg)
{
	utility::rpc_msg notify_msg;
	notify_msg.cmd = "request_call_online";
	std::vector<json> notify_cmd_args;
	notify_cmd_args.push_back(cur_msg->doc_seq);
	notify_msg.set_args(cur_msg->entity_id, std::string("notify_pull_offline_msg"), notify_cmd_args);
	m_server->call_service("login_service", notify_msg);
}

这里由于不知道这个玩家的在线call_proxy,所以只能委托到login_service去执行在线通知,所以login_service的压力还是比较大的。

注意到这个通知消息并没有将当前的entity_db_msg打包过去,只打包了序列号,这是因为要保证在OfflineMsg中一个entity_id对应的数据要严格按照序列号来处理。在我们目前的设计中,一次add_msg会触发两次数据库写入操作,然后再加上一次login_service->relay_entity->player_entity的双层转发操作,这样的多次异步过程在多进程结构中是无法保证player_entity接收到的消息是按序到达的。举个例子来说space_server_1space_server_2对同一个entity_id执行add_msg操作,space_server_1on_query_seq_back的返回值里序列号递增为了2,而 space_server_2on_query_seq_back的返回值里序列号递增为了3。然后在后续的多个网络发送中,可能会出现3这个notify_pull_offline_msg请求优先到达player_entity上,如果此时立即把带过来的3对应的通知消息处理的话,就会导致之前设定的严格按照递增序处理离线消息这个规则被违反。

player_entity接收到这个只带序号的消息通知之后,会立即再拉取OfflineMsg数据库中的未处理数据:

void player_offline_msg_component::notify_pull_offline_msg(const utility::rpc_msg& msg, std::uint64_t cur_msg_seq)
{
	auto temp_iter = m_done_msg_seqs.find(cur_msg_seq);
	if(temp_iter != m_done_msg_seqs.end())
	{
		// 这条消息之前已经处理过了
		return;
	}
	m_remain_msg_seqs.insert(cur_msg_seq);
	if(!m_is_pulling_msg)
	{
		pull_msg_impl();
	}
}

void player_offline_msg_component::pull_msg_impl()
{
	m_owner->logger()->info("entity {} pull_msg_impl ", m_owner->entity_id());
	m_is_pulling_msg = true;
	std::function<void(const std::string&, const std::vector<db_logic::entity_db_msg>& )> pre_cb = std::bind(&player_offline_msg_component::handle_new_offline_msgs, this, std::placeholders::_1, std::placeholders::_2);
	server::offline_msg_manager::instance().pull_msg(m_owner->entity_id(), 10, m_owner->convert_callback(pre_cb));
}

当拉取到新的未处理数据之后,按照顺序处理拉取的数据,处理完成之后删除已处理的,然后再执行一次拉取:

std::vector<std::uint64_t> temp_msg_seqs;
for(const auto& one_msg: result_msgs)
{
	if(m_done_msg_seqs.find(one_msg.doc_seq) != m_done_msg_seqs.end())
	{
		// 出现数据已经处理但是db还未完全删除 但是新的一次pull 又把数据拉出来的情况
		temp_msg_seqs.push_back(one_msg.doc_seq);
		continue;
	}
	m_done_msg_seqs.insert(one_msg.doc_seq);
	m_remain_msg_seqs.erase(one_msg.doc_seq);
	temp_msg_seqs.push_back(one_msg.doc_seq);
	utility::rpc_msg cur_rpc;
	try
	{
		one_msg.detail.at("cmd").get_to(cur_rpc.cmd);
		one_msg.detail.at("args").get_to(cur_rpc.args);
	}
	catch(const std::exception& e)
	{
		m_owner->logger()->error("fail to decode offline msg {}", json(one_msg.detail).dump());
		continue;
	}
	m_owner->rpc_owner_on_rpc(cur_rpc);
}
server::offline_msg_manager::instance().del_msg(m_owner->entity_id(), temp_msg_seqs);
pull_msg_impl();

当某次拉取数据得到的是空数据之后,才停止拉取。但是这里又有一个异步导致的问题,相关函数调用的时机如下所示:

T1: notify_pull_offline_msg 2 -> pull_msg_impl

T2: handle_new_offline_msgs [2] ->pull_msg_impl

T3: notify_pull_offline_msg 3

T4: handle_new_offline_msgs []

T1时刻玩家收到了序列号2的通知,此时发起了一次未读队列拉取请求pull_msg_implT2时刻处理完序列号2的数据之后再发起了一次未读队列拉取请求,T3时刻又有一个新的序列号3过来。但是这个未读队列在T2拉取时并没有读取到3的数据,导致T4时刻返回了空集合,判定为数据处理结束,这样就导致数据3停留在数据库之中,直到下次pull_msg_impl被执行。所以这里会先用一个std::unordered_set<std::uint64_t> m_remain_msg_seqs;来存储所有接收到的未处理消息序列号,只有当pull_msg_impl的结果为空且m_remain_msg_seqs才停止拉取,如果只是pull_msg_impl则开启一个短间隔的计时器去延迟拉取:

if(result_msgs.empty())
{
	if(m_remain_msg_seqs.empty())
	{
		m_is_pulling_msg = false;
		return;
	}
	else
	{
		m_owner->add_timer_with_gap(std::chrono::seconds(1), [this]()
		{
			pull_msg_impl();
		});
		return;
	}
	
}

上面介绍的是消息不需要保存历史记录时的处理机制,如果消息被处理之后不能立即删除,则玩家身上需要记录一个存库的字段,来表明当前已经处理的离线数据的最大流水号,查询的时候需要使用这个最大已读流水号去过滤。同时还需要一个存库的字段去记录当前最大未读流水号,这样能够更快的知道还剩多少消息未处理。这种带历史记录的离线保序消息处理的细节可以参考邮件系统里player_email_component里的相关逻辑。

多播消息

多播消息的在线投递机制非常简单,就是对单播消息的在线投递的一个循环调用。但是多播消息的离线投递机制则无法直接复用单播消息的离线投递机制,因为这个消息不能被单人处理后直接从数据库中删除。所以每个人需要对这个多播消息做一个最大已处理序号的记录,各自拉取消息的时候需要加入已读最大值作为查询条件。这个多播消息的典型应用场景就是群组聊天的通知,由于这部分的内容已经被聊天和群组覆盖了,实现细节上与邮件系统大同小异,所以这里就不再详细介绍。

广播消息

广播消息的离线推送实现上基本照抄自广播消息的在线player_entity推送,相关rpc流程基本是类似的,只不过在notify_service发出广播之前,需要先将数据存库,每一个消息都有一条数据库记录:

void notify_service::notify_all_db(const utility::rpc_msg& msg, const std::string& cmd, const json::array_t& args)
{
	db_logic::notify_msg new_msg;
	new_msg.msg.cmd = cmd;
	new_msg.msg.args = args;
	new_msg.doc_seq = 0;
	if(m_next_db_seq == 0)
	{
		m_db_msgs.push_back(new_msg);
		return;
	}
	new_msg.doc_seq = m_next_db_seq;
	m_next_db_seq++;
	m_db_msgs.push_back(new_msg);

	save_db_seq();
	tasks::db_task_desc::base_task cur_task_base(tasks::db_task_desc::task_op::insert_one, std::string{}, "", m_collection_name);
	json  db_doc = new_msg;
	auto cur_db_callback = [this](const json& db_reply)
	{
		on_insert_seq_back(db_reply);
	};
	auto cur_insert_task = tasks::db_task_desc::insert_task::insert_one(cur_task_base, db_doc);
	get_server()->call_db(cur_insert_task->to_json(), cur_db_callback);
	if(m_delay_broadcast_timer.valid())
	{
		return;
	}
	utility::rpc_msg cur_broadcast_msg;
	cur_broadcast_msg.cmd = "sync_add_db_msg";
	cur_broadcast_msg.set_args(new_msg);
	get_server()->call_server_multi(cur_broadcast_msg, m_broadcast_managers);
}

值得注意的是这里的m_db_msgs数组中并不会存储所有的的离线消息,而只存储本次服务器启动之后加入的离线消息。所以玩家在获取指定序列号的离线消息的时候,这条消息可能并不在notify_managerm_db_msgs数组中,需要去数据库里拉取这些数据:

void player_notify_component::handle_db_msgs()
{
	if(m_player->prop_data().notify().db_seq_read() + 1 >= server::notify_manager::instance().next_db_seq())
	{
		return;
	}
	std::uint64_t cached_db_begin = server::notify_manager::instance().next_db_seq();
	const auto& cached_msgs = server::notify_manager::instance().db_msgs();
	if(!cached_msgs.empty())
	{
		cached_db_begin = cached_msgs.front().doc_seq;
	}
	m_owner->logger()->debug("handle_db_msgs self seq {} cached_db_begin {}", m_player->prop_data().notify().db_seq_read(), cached_db_begin);
	if(m_player->prop_data().notify().db_seq_read() + 1 < cached_db_begin)
	{
		// pull from db
		std::uint64_t pull_seq_end = cached_db_begin - 1;
		std::uint64_t pull_seq_begin = m_player->prop_data().notify().db_seq_read() + 1;
		if(pull_seq_end - pull_seq_begin > 10)
		{
			pull_seq_end = pull_seq_begin + 10;
		}
		pull_unread_db_msgs(pull_seq_begin, pull_seq_end);
	}
	else
	{
		// pull from memory
		if(cached_msgs.empty())
		{
			return;
		}
		
		for(auto i = m_player->prop_data().notify().db_seq_read() - cached_msgs.front().doc_seq + 1; i< cached_msgs.size(); i++)
		{
			on_new_notify(cached_msgs[i], false);
		}
	}
}

notify().db_seq_read()里存储的是最大离线消息已处理序列号,这个属性是要存库的,而在线消息notify().online_seq_read()则不存库。 handle_db_msgs在被触发的时候会检查下一个要处理的数据是否在notify_manager中,不在的话拉取后续的10条数据来渐进处理:

void player_notify_component::pull_unread_db_msgs(std::uint64_t seq_begin, std::uint64_t seq_end)
{
	const std::string notify_db_name = "NotifyMsg";
	auto cur_reply_cb = m_owner->convert_callback(m_owner->add_callback([=](const json& db_result)
	{
		this->pull_unread_db_msgs_cb(seq_begin, seq_end, db_result);
	}));
	tasks::db_task_desc::base_task cur_task_base(tasks::db_task_desc::task_op::find_multi, std::string{}, "", notify_db_name);
	json query_filter, sort;

	query_filter["doc_seq"]["$lte"] = seq_end;
	query_filter["doc_seq"]["$gte"] = seq_begin;
	sort["doc_seq"] = 1;

	auto cur_find_task = tasks::db_task_desc::find_task::find_multi(cur_task_base, query_filter, seq_end - seq_begin + 1, {}, 0, tasks::db_task_desc::read_prefer_mode::secondary, sort );
	m_owner->call_db(cur_find_task->to_json(), cur_reply_cb);
}

在数据查询回来之后再使用之前在线消息分发使用的on_new_notify来分发消息,这个共享接口里通过一个bool值来区分是在线消息和离线消息。处理完一个批次之后会再使用handle_db_msgs来检查是否还有后续消息要处理:

void player_notify_component::pull_unread_db_msgs_cb(std::uint64_t seq_begin, std::uint64_t seq_end, const json& db_reply)
{
	std::vector<db_logic::notify_msg> result_docs;
	std::string error;
	tasks::db_task_desc::task_reply cur_reply;
	// 省略反序列化相关代码
	for(const auto& one_msg: result_docs)
	{
		on_new_notify(one_msg, false);
	}
	if(seq_end > m_player->prop_data().notify().db_seq_read())
	{
		m_player->prop_proxy().notify().db_seq_read().set(seq_end);
	}
	handle_db_msgs();
}