Qter 发表于 2023-11-18 18:39:34

Chromium IPC 深入分析(基于Mojo)

https://zhuanlan.zhihu.com/p/508362483

本文主要讲解是最新版的chromium ipc,需要有mojo的基础知识。最新版的chromium ipc的底层通信是通过mojo接口通信的,ipc.mojom是ipc的mojo接口通道。Browser与Render之间通信https://pic2.zhimg.com/80/v2-68354460173125825ff059f6902d6e7d_720w.webp如上图Render进程的RenderFrameImpl发送消息给Browser进程RenderFrameHostImpl,中间是通过ipc.mojo接口。ipc.mojo的idl代码module IPC.mojom;import "mojo/public/interfaces/bindings/native_struct.mojom";import "mojo/public/mojom/base/generic_pending_associated_receiver.mojom";// Typemapped such that arbitrarily large IPC::Message objects can be sent and// received with minimal copying.struct Message {array<uint8> bytes;array<mojo.native.SerializedHandle>? handles;};interface Channel {// Informs the remote end of this client's PID. Must be called exactly once,// before any calls to Receive() below.SetPeerPid(int32 pid);// Transmits a classical Chrome IPC message.Receive(Message message);// Requests a Channel-associated interface.GetAssociatedInterface(      mojo_base.mojom.GenericPendingAssociatedReceiver receiver);};// A strictly nominal interface used to identify Channel bootstrap requests.// This is only used in `AgentSchedulingGroup` initialization.interface ChannelBootstrap {};
如上可以看出mojo接口是Channel,Channel中有Receive就是消息处理和发送函数,GetAssociatedInterface是用于创建mojo接口关联函数。接下来围绕Channel讲解,IPC核心讲解主要代码路径在src/ipchttps://pic3.zhimg.com/80/v2-515d4baee6e5810c9628bfc7e9bcf01e_720w.webp如上图主要类的关系:注意Listener和Sender两个接口类,主要是发送消息和接受消息的虚接口,最终的实现调用还是MessagePipeReader类。MessagePipeReader类是mojo::Channel的实现,MessagePipeReader这个类就是ipc消息处理和发送类。class COMPONENT_EXPORT(IPC) MessagePipeReader : public mojom::Channel { public:class Delegate {   public:    virtual void OnPeerPidReceived(int32_t peer_pid) = 0;    virtual void OnMessageReceived(const Message& message) = 0;    virtual void OnBrokenDataReceived() = 0;    virtual void OnPipeError() = 0;    virtual void OnAssociatedInterfaceRequest(      mojo::GenericPendingAssociatedReceiver receiver) = 0;};.......bool Send(std::unique_ptr<Message> message);....... protected:void OnPipeClosed();void OnPipeError(MojoResult error); private:// mojom::Channel:void SetPeerPid(int32_t peer_pid) override;void Receive(MessageView message_view) override;void GetAssociatedInterface(      mojo::GenericPendingAssociatedReceiver receiver) override;void ForwardMessage(mojo::Message message);// |delegate_| is null once the message pipe is closed.raw_ptr<Delegate> delegate_; // 用于调用消息处理mojo::AssociatedRemote<mojom::Channel> sender_; // AssociatedRemote用于消息发送std::unique_ptr<mojo::ThreadSafeForwarder<mojom::Channel>>      thread_safe_sender_;mojo::AssociatedReceiver<mojom::Channel> receiver_;base::ThreadChecker thread_checker_;base::WeakPtrFactory<MessagePipeReader> weak_ptr_factory_{this};};
如上代码,MessagePipeReader 继承mojom::Channel,MessagePipeReader中有mojo::AssociatedRemote<mojom::Channel> sender_成员用于消息发送。IPC消息发送下面是以Render进程中RenderFrameImpl的消息发送流程图https://pic4.zhimg.com/80/v2-b5e1b1897ec9459e2bc59df6bcd9aa8b_720w.webp如上图,我们以RenderFrameImpl为例,最终调用的是MessagePipeReader::Send函数,通过sender_->Receive函数mojo通道发送到另一个进程bool MessagePipeReader::Send(std::unique_ptr<Message> message) {CHECK(message->IsValid());TRACE_EVENT_WITH_FLOW0("toplevel.flow", "MessagePipeReader::Send",                         message->flags(), TRACE_EVENT_FLAG_FLOW_OUT);absl::optional<std::vector<mojo::native::SerializedHandlePtr>> handles;MojoResult result = MOJO_RESULT_OK;result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles);if (result != MOJO_RESULT_OK)    return false;if (!sender_)    return false;base::span<const uint8_t> bytes(static_cast<const uint8_t*>(message->data()),                                  message->size());sender_->Receive(MessageView(bytes, std::move(handles)));DVLOG(4) << "Send " << message->type() << ": " << message->size();return true;}
IPC消息处理下面是以Render进程中RenderFrameImpl的消息处理流程图https://pic1.zhimg.com/80/v2-99a19bf63e94b5baa997704d20a08f60_720w.webp1.mojo通过调用MessagePipeReader::Receive然后分发给对应的类处理void MessagePipeReader::Receive(MessageView message_view) {if (message_view.bytes().empty()) {    delegate_->OnBrokenDataReceived();    return;}Message message(reinterpret_cast<const char*>(message_view.bytes().data()),                  message_view.bytes().size());if (!message.IsValid()) {    delegate_->OnBrokenDataReceived();    return;}DVLOG(4) << "Receive " << message.type() << ": " << message.size();MojoResult write_result = ChannelMojo::WriteToMessageAttachmentSet(      message_view.TakeHandles(), &message);if (write_result != MOJO_RESULT_OK) {    OnPipeError(write_result);    return;}TRACE_EVENT_WITH_FLOW0("toplevel.flow", "MessagePipeReader::Receive",                         message.flags(), TRACE_EVENT_FLAG_FLOW_IN);delegate_->OnMessageReceived(message); // 通过代理分发处理}
2.在消息处理函数中,delegate_指的是ChannelMojo,代码路径ipc/ipc_channel_mojo.hclass COMPONENT_EXPORT(IPC) ChannelMojo    : public Channel,      public Channel::AssociatedInterfaceSupport,      public internal::MessagePipeReader::Delegate { public:......bool Send(Message* message) override;......void OnMessageReceived(const Message& message) override;...... private:......const mojo::MessagePipeHandle pipe_;std::unique_ptr<MojoBootstrap> bootstrap_;raw_ptr<Listener> listener_;std::unique_ptr<internal::MessagePipeReader> message_reader_;......};
ChannelMojo的基类有MessagePipeReader::Delegate,ChannelMojo::OnMessageReceived来转发消息处理void ChannelMojo::OnMessageReceived(const Message& message) {const Message* message_ptr = &message;TRACE_IPC_MESSAGE_SEND("ipc,toplevel", "ChannelMojo::OnMessageReceived",                         message_ptr);listener_->OnMessageReceived(message);if (message.dispatch_error())    listener_->OnBadMessageReceived(message);}
如上代码,是通过listener_来转发消息。listener_本质上就是ChannelProxy::Context,在创建Channel的时候会把ChannelProxy::Context传入给ChannelMojo中。void ChannelProxy::Context::CreateChannel(    std::unique_ptr<ChannelFactory> factory) {base::AutoLock channel_lock(channel_lifetime_lock_);DCHECK(!channel_);DCHECK_EQ(factory->GetIPCTaskRunner(), ipc_task_runner_);channel_ = factory->BuildChannel(this); // 创建ChannelChannel::AssociatedInterfaceSupport* support =      channel_->GetAssociatedInterfaceSupport();if (support) {    thread_safe_channel_ = support->CreateThreadSafeChannel();    base::AutoLock filter_lock(pending_filters_lock_);    for (auto& entry : pending_io_thread_interfaces_)      support->AddGenericAssociatedInterface(entry.first, entry.second);    pending_io_thread_interfaces_.clear();}}
3.接下来ChannelProxy::Context::OnMessageReceived处理消息bool ChannelProxy::Context::OnMessageReceived(const Message& message) {// First give a chance to the filters to process this message.if (!TryFilters(message))    OnMessageReceivedNoFilter(message);return true;}
如上代码本质上是调用ChannelProxy::Context::OnMessageReceivedNoFilter,这里我们发现有个PostTask。为什么需要post任务的方式去处理消息?因为我们知道每个IPC消息不一定在同一个线程,所以在处理message的时候会根据消息的routing_id去查找对应的TaskRunner,这样就可以保证IPC消息在正确的线程上处理。如下代码bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {GetTaskRunner(message.routing_id())      ->PostTask(FROM_HERE,               base::BindOnce(&Context::OnDispatchMessage, this, message));return true;}
然后调用到ChannelProxy::Context::OnDispatchMessage处理消息,这里真正会调用listener_->OnMessageReceived的函数去处理消息。listener_是Listener,在这个流程中是AgentSchedulingGroup。如下代码:// Called on the listener's threadvoid ChannelProxy::Context::OnDispatchMessage(const Message& message) {if (!listener_)    return;OnDispatchConnected();#if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)Logging* logger = Logging::GetInstance();if (message.type() == IPC_LOGGING_ID) {    logger->OnReceivedLoggingMessage(message);    return;}if (logger->Enabled())    logger->OnPreDispatchMessage(message);#endiflistener_->OnMessageReceived(message); // 真正调用listener处理if (message.dispatch_error())    listener_->OnBadMessageReceived(message);#if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)if (logger->Enabled())    logger->OnPostDispatchMessage(message);#endif}
4.调用AgentSchedulingGroup去处理消息,每个消息会有一个routing_id,message这个routing_id会找到对应的listener。bool AgentSchedulingGroup::OnMessageReceived(const IPC::Message& message) {DCHECK_NE(message.routing_id(), MSG_ROUTING_CONTROL);auto* listener = GetListener(message.routing_id());if (!listener)    return false;return listener->OnMessageReceived(message);}
5.上面根据routing_id找到RenderFrameImpl,RenderFrameImpl的基类会有IPC::Listener,所以调用RenderFrameImpl::OnMessageReceived进行处理对应的IPCbool RenderFrameImpl::OnMessageReceived(const IPC::Message& msg) {// We may get here while detaching, when the WebFrame has been deleted.Do// not process any messages in this state.if (!frame_)    return false;DCHECK(!frame_->GetDocument().IsNull());GetContentClient()->SetActiveURL(      frame_->GetDocument().Url(),      frame_->Top()->GetSecurityOrigin().ToString().Utf8());for (auto& observer : observers_) {    if (observer.OnMessageReceived(msg))      return true;}return false;}
管理IPC::Listener管理IPC::Listener的类有很多,RenderThreadImpl,AgentSchedulingGroup,AgentSchedulingGroupHost,ChildThreadImpl等等。这里主要讲解Render进程管理的AgentSchedulingGroup管理IPC::Listener,主要存储在listener_map_变量中https://pic3.zhimg.com/80/v2-7687964a8f92f5f0c7a43edb7dc891ae_720w.webprouting_id和listener通过AgentSchedulingGroup::AddRoute进行添加的void AgentSchedulingGroup::AddRoute(int32_t routing_id, Listener* listener) {DCHECK(!listener_map_.Lookup(routing_id));listener_map_.AddWithID(listener, routing_id); // 存储routing_id和listenerrender_thread_.AddRoute(routing_id, listener);// See warning in `GetAssociatedInterface`.// Replay any `GetAssociatedInterface` calls for this route.auto range = pending_receivers_.equal_range(routing_id);for (auto iter = range.first; iter != range.second; ++iter) {    ReceiverData& data = iter->second;    listener->OnAssociatedInterfaceRequest(data.name,                                           data.receiver.PassHandle());}pending_receivers_.erase(range.first, range.second);}
文章到此结束,如果有出入请多多指教!
页: [1]
查看完整版本: Chromium IPC 深入分析(基于Mojo)