firemail
标题: Chromium IPC 深入分析(基于Mojo) [打印本页]
作者: Qter 时间: 2023-11-18 18:39
标题: Chromium IPC 深入分析(基于Mojo)
https://zhuanlan.zhihu.com/p/508362483
本文主要讲解是最新版的chromium ipc,需要有mojo的基础知识。
最新版的chromium ipc的底层通信是通过mojo接口通信的,ipc.mojom是ipc的mojo接口通道。
Browser与Render之间通信如上图Render进程的RenderFrameImpl发送消息给Browser进程RenderFrameHostImpl,中间是通过ipc.mojo接口。
ipc.mojo的idl代码
module IPC.mojom;import "mojo/public/interfaces/bindings/native_struct.mojom";import "mojo/public/mojom/base/generic_pending_associated_receiver.mojom";// Typemapped such that arbitrarily large IPC::Message objects can be sent and// received with minimal copying.struct Message { array<uint8> bytes; array<mojo.native.SerializedHandle>? handles;};interface Channel { // Informs the remote end of this client's PID. Must be called exactly once, // before any calls to Receive() below. SetPeerPid(int32 pid); // Transmits a classical Chrome IPC message. [UnlimitedSize] Receive(Message message); // Requests a Channel-associated interface. GetAssociatedInterface( mojo_base.mojom.GenericPendingAssociatedReceiver receiver);};// A strictly nominal interface used to identify Channel bootstrap requests.// This is only used in `AgentSchedulingGroup` initialization.interface ChannelBootstrap {};
如上可以看出mojo接口是Channel,Channel中有Receive就是消息处理和发送函数,GetAssociatedInterface是用于创建mojo接口关联函数。接下来围绕Channel讲解,
IPC核心讲解主要代码路径在src/ipc
如上图主要类的关系:
注意Listener和Sender两个接口类,主要是发送消息和接受消息的虚接口,最终的实现调用还是MessagePipeReader类。
MessagePipeReader类是mojo::Channel的实现,MessagePipeReader这个类就是ipc消息处理和发送类。
class COMPONENT_EXPORT(IPC) MessagePipeReader : public mojom::Channel { public: class Delegate { public: virtual void OnPeerPidReceived(int32_t peer_pid) = 0; virtual void OnMessageReceived(const Message& message) = 0; virtual void OnBrokenDataReceived() = 0; virtual void OnPipeError() = 0; virtual void OnAssociatedInterfaceRequest( mojo::GenericPendingAssociatedReceiver receiver) = 0; };....... bool Send(std::unique_ptr<Message> message);....... protected: void OnPipeClosed(); void OnPipeError(MojoResult error); private: // mojom::Channel: void SetPeerPid(int32_t peer_pid) override; void Receive(MessageView message_view) override; void GetAssociatedInterface( mojo::GenericPendingAssociatedReceiver receiver) override; void ForwardMessage(mojo::Message message); // |delegate_| is null once the message pipe is closed. raw_ptr<Delegate> delegate_; // 用于调用消息处理 mojo::AssociatedRemote<mojom::Channel> sender_; // AssociatedRemote用于消息发送 std::unique_ptr<mojo::ThreadSafeForwarder<mojom::Channel>> thread_safe_sender_; mojo::AssociatedReceiver<mojom::Channel> receiver_; base::ThreadChecker thread_checker_; base::WeakPtrFactory<MessagePipeReader> weak_ptr_factory_{this};};
如上代码,MessagePipeReader 继承mojom::Channel,MessagePipeReader中有mojo::AssociatedRemote<mojom::Channel> sender_成员用于消息发送。
IPC消息发送下面是以Render进程中RenderFrameImpl的消息发送流程图
如上图,我们以RenderFrameImpl为例,最终调用的是MessagePipeReader::Send函数,通过sender_->Receive函数mojo通道发送到另一个进程
bool MessagePipeReader::Send(std::unique_ptr<Message> message) { CHECK(message->IsValid()); TRACE_EVENT_WITH_FLOW0("toplevel.flow", "MessagePipeReader::Send", message->flags(), TRACE_EVENT_FLAG_FLOW_OUT); absl:ptional<std::vector<mojo::native::SerializedHandlePtr>> handles; MojoResult result = MOJO_RESULT_OK; result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles); if (result != MOJO_RESULT_OK) return false; if (!sender_) return false; base::span<const uint8_t> bytes(static_cast<const uint8_t*>(message->data()), message->size()); sender_->Receive(MessageView(bytes, std::move(handles))); DVLOG(4) << "Send " << message->type() << ": " << message->size(); return true;}
IPC消息处理下面是以Render进程中RenderFrameImpl的消息处理流程图
1.mojo通过调用MessagePipeReader::Receive然后分发给对应的类处理
void MessagePipeReader::Receive(MessageView message_view) { if (message_view.bytes().empty()) { delegate_->OnBrokenDataReceived(); return; } Message message(reinterpret_cast<const char*>(message_view.bytes().data()), message_view.bytes().size()); if (!message.IsValid()) { delegate_->OnBrokenDataReceived(); return; } DVLOG(4) << "Receive " << message.type() << ": " << message.size(); MojoResult write_result = ChannelMojo::WriteToMessageAttachmentSet( message_view.TakeHandles(), &message); if (write_result != MOJO_RESULT_OK) { OnPipeError(write_result); return; } TRACE_EVENT_WITH_FLOW0("toplevel.flow", "MessagePipeReader::Receive", message.flags(), TRACE_EVENT_FLAG_FLOW_IN); delegate_->OnMessageReceived(message); // 通过代理分发处理}
2.在消息处理函数中,delegate_指的是ChannelMojo,代码路径ipc/ipc_channel_mojo.h
class COMPONENT_EXPORT(IPC) ChannelMojo : public Channel, public Channel::AssociatedInterfaceSupport, public internal::MessagePipeReader:elegate { public:...... bool Send(Message* message) override;...... void OnMessageReceived(const Message& message) override;...... private:...... const mojo::MessagePipeHandle pipe_; std::unique_ptr<MojoBootstrap> bootstrap_; raw_ptr<Listener> listener_; std::unique_ptr<internal::MessagePipeReader> message_reader_;......};
ChannelMojo的基类有MessagePipeReader:elegate,ChannelMojo::OnMessageReceived来转发消息处理 void ChannelMojo::OnMessageReceived(const Message& message) { const Message* message_ptr = &message; TRACE_IPC_MESSAGE_SEND("ipc,toplevel", "ChannelMojo::OnMessageReceived", message_ptr); listener_->OnMessageReceived(message); if (message.dispatch_error()) listener_->OnBadMessageReceived(message);}
如上代码,是通过listener_来转发消息。
listener_本质上就是ChannelProxy::Context,在创建Channel的时候会把ChannelProxy::Context传入给ChannelMojo中。
void ChannelProxy::Context::CreateChannel( std::unique_ptr<ChannelFactory> factory) { base::AutoLock channel_lock(channel_lifetime_lock_); DCHECK(!channel_); DCHECK_EQ(factory->GetIPCTaskRunner(), ipc_task_runner_); channel_ = factory->BuildChannel(this); // 创建Channel Channel::AssociatedInterfaceSupport* support = channel_->GetAssociatedInterfaceSupport(); if (support) { thread_safe_channel_ = support->CreateThreadSafeChannel(); base::AutoLock filter_lock(pending_filters_lock_); for (auto& entry : pending_io_thread_interfaces_) support->AddGenericAssociatedInterface(entry.first, entry.second); pending_io_thread_interfaces_.clear(); }}
3.接下来ChannelProxy::Context::OnMessageReceived处理消息
bool ChannelProxy::Context::OnMessageReceived(const Message& message) { // First give a chance to the filters to process this message. if (!TryFilters(message)) OnMessageReceivedNoFilter(message); return true;}
如上代码本质上是调用ChannelProxy::Context::OnMessageReceivedNoFilter,这里我们发现有个PostTask。
为什么需要post任务的方式去处理消息?因为我们知道每个IPC消息不一定在同一个线程,所以在处理message的时候会根据消息的routing_id去查找对应的TaskRunner,这样就可以保证IPC消息在正确的线程上处理。如下代码
bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) { GetTaskRunner(message.routing_id()) ->ostTask(FROM_HERE, base::BindOnce(&Context::OnDispatchMessage, this, message)); return true;}
然后调用到ChannelProxy::Context::OnDispatchMessage处理消息,这里真正会调用listener_->OnMessageReceived的函数去处理消息。listener_是Listener,在这个流程中是AgentSchedulingGroup。如下代码:
// Called on the listener's threadvoid ChannelProxy::Context::OnDispatchMessage(const Message& message) { if (!listener_) return; OnDispatchConnected();#if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED) Logging* logger = Logging::GetInstance(); if (message.type() == IPC_LOGGING_ID) { logger->OnReceivedLoggingMessage(message); return; } if (logger->Enabled()) logger->OnPreDispatchMessage(message);#endif listener_->OnMessageReceived(message); // 真正调用listener处理 if (message.dispatch_error()) listener_->OnBadMessageReceived(message);#if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED) if (logger->Enabled()) logger->OnPostDispatchMessage(message);#endif}
4.调用AgentSchedulingGroup去处理消息,每个消息会有一个routing_id,message这个routing_id会找到对应的listener。
bool AgentSchedulingGroup::OnMessageReceived(const IPC::Message& message) { DCHECK_NE(message.routing_id(), MSG_ROUTING_CONTROL); auto* listener = GetListener(message.routing_id()); if (!listener) return false; return listener->OnMessageReceived(message);}
5.上面根据routing_id找到RenderFrameImpl,RenderFrameImpl的基类会有IPC:istener,所以调用RenderFrameImpl::OnMessageReceived进行处理对应的IPC bool RenderFrameImpl::OnMessageReceived(const IPC::Message& msg) { // We may get here while detaching, when the WebFrame has been deleted. Do // not process any messages in this state. if (!frame_) return false; DCHECK(!frame_->GetDocument().IsNull()); GetContentClient()->SetActiveURL( frame_->GetDocument().Url(), frame_->Top()->GetSecurityOrigin().ToString().Utf8()); for (auto& observer : observers_) { if (observer.OnMessageReceived(msg)) return true; } return false;}
管理IPC:istener管理IPC:istener的类有很多,RenderThreadImpl,AgentSchedulingGroup,AgentSchedulingGroupHost,ChildThreadImpl等等。 这里主要讲解Render进程管理的AgentSchedulingGroup管理IPC:istener,主要存储在listener_map_变量中 routing_id和listener通过AgentSchedulingGroup::AddRoute进行添加的
void AgentSchedulingGroup::AddRoute(int32_t routing_id, Listener* listener) { DCHECK(!listener_map_.Lookup(routing_id)); listener_map_.AddWithID(listener, routing_id); // 存储routing_id和listener render_thread_.AddRoute(routing_id, listener); // See warning in `GetAssociatedInterface`. // Replay any `GetAssociatedInterface` calls for this route. auto range = pending_receivers_.equal_range(routing_id); for (auto iter = range.first; iter != range.second; ++iter) { ReceiverData& data = iter->second; listener->OnAssociatedInterfaceRequest(data.name, data.receiver.PassHandle()); } pending_receivers_.erase(range.first, range.second);}
文章到此结束,如果有出入请多多指教!
欢迎光临 firemail (http://firemail.wang:8088/) |
Powered by Discuz! X3 |