企业网站改版价格,wordpress第三方登陆,建设银行办信用卡网站,百度收录规则目录 完整代码代码解释1. 消息的数据类#xff1a;2. 创建代理人#xff08;MyAgent#xff09;#xff1a;3. 创建和运行代理人的运行时环境#xff1a;4. 根据发送者路由消息的代理#xff08;RoutedBySenderAgent#xff09;#xff1a;5. 创建和运行带路由的代理2. 创建代理人MyAgent3. 创建和运行代理人的运行时环境4. 根据发送者路由消息的代理RoutedBySenderAgent5. 创建和运行带路由的代理6. 内外代理的通信InnerAgent 和 OuterAgent7. 运行内外代理8. 消息广播1. ReceivingAgent接收消息的代理2. BroadcastingAgent发布消息的代理3. 注册代理并处理消息4. 默认主题与订阅5. 注册代理并启动消息发布 类似例子 完整代码
from dataclasses import dataclassdataclass
class TextMessage:content: strsource: strdataclass
class ImageMessage:url: strsource: strfrom autogen_core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handlerclass MyAgent(RoutedAgent):message_handlerasync def on_text_message(self, message: TextMessage, ctx: MessageContext) - None:print(fHello, {message.source}, you said {message.content}!)message_handlerasync def on_image_message(self, message: ImageMessage, ctx: MessageContext) - None:print(fHello, {message.source}, you sent me {message.url}!)runtime SingleThreadedAgentRuntime()
await MyAgent.register(runtime, my_agent, lambda: MyAgent(My Agent))AgentType(typemy_agent)runtime.start()
agent_id AgentId(my_agent, default)
await runtime.send_message(TextMessage(contentHello, World!, sourceUser), agent_id)
await runtime.send_message(ImageMessage(urlhttps://example.com/image.jpg, sourceUser), agent_id)
await runtime.stop_when_idle()Hello, User, you said Hello, World!!
Hello, User, you sent me https://example.com/image.jpg!class RoutedBySenderAgent(RoutedAgent):message_handler(matchlambda msg, ctx: msg.source.startswith(user1)) # type: ignoreasync def on_user1_message(self, message: TextMessage, ctx: MessageContext) - None:print(fHello from user 1 handler, {message.source}, you said {message.content}!)message_handler(matchlambda msg, ctx: msg.source.startswith(user2)) # type: ignoreasync def on_user2_message(self, message: TextMessage, ctx: MessageContext) - None:print(fHello from user 2 handler, {message.source}, you said {message.content}!)message_handler(matchlambda msg, ctx: msg.source.startswith(user2)) # type: ignoreasync def on_image_message(self, message: ImageMessage, ctx: MessageContext) - None:print(fHello, {message.source}, you sent me {message.url}!)runtime SingleThreadedAgentRuntime()
await RoutedBySenderAgent.register(runtime, my_agent, lambda: RoutedBySenderAgent(Routed by sender agent))
runtime.start()
agent_id AgentId(my_agent, default)
await runtime.send_message(TextMessage(contentHello, World!, sourceuser1-test), agent_id)
await runtime.send_message(TextMessage(contentHello, World!, sourceuser2-test), agent_id)
await runtime.send_message(ImageMessage(urlhttps://example.com/image.jpg, sourceuser1-test), agent_id)
await runtime.send_message(ImageMessage(urlhttps://example.com/image.jpg, sourceuser2-test), agent_id)
await runtime.stop_when_idle()Hello from user 1 handler, user1-test, you said Hello, World!!
Hello from user 2 handler, user2-test, you said Hello, World!!
Hello, user2-test, you sent me https://example.com/image.jpg!from dataclasses import dataclassfrom autogen_core import MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handlerdataclass
class Message:content: strclass InnerAgent(RoutedAgent):message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) - Message:return Message(contentfHello from inner, {message.content})class OuterAgent(RoutedAgent):def __init__(self, description: str, inner_agent_type: str):super().__init__(description)self.inner_agent_id AgentId(inner_agent_type, self.id.key)message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) - None:print(fReceived message: {message.content})# Send a direct message to the inner agent and receves a response.response await self.send_message(Message(fHello from outer, {message.content}), self.inner_agent_id)print(fReceived inner response: {response.content})runtime SingleThreadedAgentRuntime()
await InnerAgent.register(runtime, inner_agent, lambda: InnerAgent(InnerAgent))
await OuterAgent.register(runtime, outer_agent, lambda: OuterAgent(OuterAgent, inner_agent))
runtime.start()
outer_agent_id AgentId(outer_agent, default)
await runtime.send_message(Message(contentHello, World!), outer_agent_id)
await runtime.stop_when_idle()Received message: Hello, World!
Received inner response: Hello from inner, Hello from outer, Hello, World!from autogen_core import RoutedAgent, message_handler, type_subscriptiontype_subscription(topic_typedefault)
class ReceivingAgent(RoutedAgent):message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) - None:print(fReceived a message: {message.content})from autogen_core import TopicIdclass BroadcastingAgent(RoutedAgent):message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) - None:await self.publish_message(Message(Publishing a message from broadcasting agent!),topic_idTopicId(typedefault, sourceself.id.key),)from autogen_core import TypeSubscriptionruntime SingleThreadedAgentRuntime()# Option 1: with type_subscription decorator
# The type_subscription class decorator automatically adds a TypeSubscription to
# the runtime when the agent is registered.
await ReceivingAgent.register(runtime, receiving_agent, lambda: ReceivingAgent(Receiving Agent))# Option 2: with TypeSubscription
await BroadcastingAgent.register(runtime, broadcasting_agent, lambda: BroadcastingAgent(Broadcasting Agent))
await runtime.add_subscription(TypeSubscription(topic_typedefault, agent_typebroadcasting_agent))# Start the runtime and publish a message.
runtime.start()
await runtime.publish_message(Message(Hello, World! From the runtime!), topic_idTopicId(typedefault, sourcedefault)
)
await runtime.stop_when_idle()Received a message: Hello, World! From the runtime!
Received a message: Publishing a message from broadcasting agent!from autogen_core import DefaultTopicId, default_subscriptiondefault_subscription
class BroadcastingAgentDefaultTopic(RoutedAgent):message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) - None:# Publish a message to all agents in the same namespace.await self.publish_message(Message(Publishing a message from broadcasting agent!),topic_idDefaultTopicId(),)runtime SingleThreadedAgentRuntime()
await BroadcastingAgentDefaultTopic.register(runtime, broadcasting_agent, lambda: BroadcastingAgentDefaultTopic(Broadcasting Agent)
)
await ReceivingAgent.register(runtime, receiving_agent, lambda: ReceivingAgent(Receiving Agent))
runtime.start()
await runtime.publish_message(Message(Hello, World! From the runtime!), topic_idDefaultTopicId())
await runtime.stop_when_idle()Received a message: Hello, World! From the runtime!
Received a message: Publishing a message from broadcasting agent!代码解释
1. 消息的数据类
from dataclasses import dataclassdataclass
class TextMessage:content: strsource: strdataclass
class ImageMessage:url: strsource: str这段代码定义了两个数据类TextMessage 和 ImageMessage。它们分别用于表示文本消息和图片消息。
TextMessage 包含两个字段content消息内容和 source发送者。ImageMessage 包含 url图片的链接地址和 source发送者。
2. 创建代理人MyAgent
from autogen_core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handlerclass MyAgent(RoutedAgent):message_handlerasync def on_text_message(self, message: TextMessage, ctx: MessageContext) - None:print(fHello, {message.source}, you said {message.content}!)message_handlerasync def on_image_message(self, message: ImageMessage, ctx: MessageContext) - None:print(fHello, {message.source}, you sent me {message.url}!)MyAgent 继承自 RoutedAgent 类并定义了两个消息处理器
on_text_message处理 TextMessage 消息打印出发送者和内容。on_image_message处理 ImageMessage 消息打印出发送者和图片的链接地址。
3. 创建和运行代理人的运行时环境
runtime SingleThreadedAgentRuntime()
await MyAgent.register(runtime, my_agent, lambda: MyAgent(My Agent))
runtime.start()
agent_id AgentId(my_agent, default)
await runtime.send_message(TextMessage(contentHello, World!, sourceUser), agent_id)
await runtime.send_message(ImageMessage(urlhttps://example.com/image.jpg, sourceUser), agent_id)
await runtime.stop_when_idle()这段代码启动了一个单线程的运行时环境 SingleThreadedAgentRuntime。
注册了 MyAgent 代理代理名称为 my_agent。使用 runtime.send_message 向代理发送了一条文本消息和一条图片消息。最后运行时环境会在空闲时停止。
4. 根据发送者路由消息的代理RoutedBySenderAgent
class RoutedBySenderAgent(RoutedAgent):message_handler(matchlambda msg, ctx: msg.source.startswith(user1)) # type: ignoreasync def on_user1_message(self, message: TextMessage, ctx: MessageContext) - None:print(fHello from user 1 handler, {message.source}, you said {message.content}!)message_handler(matchlambda msg, ctx: msg.source.startswith(user2)) # type: ignoreasync def on_user2_message(self, message: TextMessage, ctx: MessageContext) - None:print(fHello from user 2 handler, {message.source}, you said {message.content}!)message_handler(matchlambda msg, ctx: msg.source.startswith(user2)) # type: ignoreasync def on_image_message(self, message: ImageMessage, ctx: MessageContext) - None:print(fHello, {message.source}, you sent me {message.url}!)RoutedBySenderAgent 继承自 RoutedAgent并通过 message_handler 装饰器根据消息的发送者 (source) 来路由消息。
如果消息来自以 user1 开头的发送者它会调用 on_user1_message 处理文本消息。如果消息来自以 user2 开头的发送者它会调用 on_user2_message 处理文本消息。如果是来自 user2 的图片消息则调用 on_image_message 进行处理。
5. 创建和运行带路由的代理
runtime SingleThreadedAgentRuntime()
await RoutedBySenderAgent.register(runtime, my_agent, lambda: RoutedBySenderAgent(Routed by sender agent))
runtime.start()
agent_id AgentId(my_agent, default)
await runtime.send_message(TextMessage(contentHello, World!, sourceuser1-test), agent_id)
await runtime.send_message(TextMessage(contentHello, World!, sourceuser2-test), agent_id)
await runtime.send_message(ImageMessage(urlhttps://example.com/image.jpg, sourceuser1-test), agent_id)
await runtime.send_message(ImageMessage(urlhttps://example.com/image.jpg, sourceuser2-test), agent_id)
await runtime.stop_when_idle()在这个部分运行时环境启动并注册了 RoutedBySenderAgent 代理。 发送了不同来源的文本消息和图片消息。 根据消息的 source 字段消息会被路由到对应的处理函数。 最终代理会打印不同的响应信息取决于消息的发送者。
6. 内外代理的通信InnerAgent 和 OuterAgent
dataclass
class Message:content: strclass InnerAgent(RoutedAgent):message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) - Message:return Message(contentfHello from inner, {message.content})class OuterAgent(RoutedAgent):def __init__(self, description: str, inner_agent_type: str):super().__init__(description)self.inner_agent_id AgentId(inner_agent_type, self.id.key)message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) - None:print(fReceived message: {message.content})response await self.send_message(Message(fHello from outer, {message.content}), self.inner_agent_id)print(fReceived inner response: {response.content})InnerAgent 是一个简单的代理它收到消息后返回一个修改过的消息。
OuterAgent 也作为代理存在并包含对 InnerAgent 的引用。它会发送消息给 InnerAgent并等待回应。
7. 运行内外代理
runtime SingleThreadedAgentRuntime()
await InnerAgent.register(runtime, inner_agent, lambda: InnerAgent(InnerAgent))
await OuterAgent.register(runtime, outer_agent, lambda: OuterAgent(OuterAgent, inner_agent))
runtime.start()
outer_agent_id AgentId(outer_agent, default)
await runtime.send_message(Message(contentHello, World!), outer_agent_id)
await runtime.stop_when_idle()这个部分启动了 InnerAgent 和 OuterAgent 代理。
OuterAgent 会发送一条消息给 InnerAgent并接收到来自 InnerAgent 的响应。
最终输出显示了 OuterAgent 接收到的消息以及 InnerAgent 的响应。
8. 消息广播
这一部分涉及了通过 topic 和 subscription 实现的消息广播和接收机制。它展示了如何通过 topic 来发布消息以及如何订阅消息。
1. ReceivingAgent接收消息的代理
from autogen_core import RoutedAgent, message_handler, type_subscriptiontype_subscription(topic_typedefault)
class ReceivingAgent(RoutedAgent):message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) - None:print(fReceived a message: {message.content})ReceivingAgent 继承自 RoutedAgent并使用了 type_subscription(topic_typedefault) 装饰器。这个装饰器将代理与某个类型的 topic本例中是 default关联。
当收到来自 default 主题的消息时代理会调用 on_my_message 方法并打印出消息内容。
2. BroadcastingAgent发布消息的代理
from autogen_core import TopicIdclass BroadcastingAgent(RoutedAgent):message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) - None:await self.publish_message(Message(Publishing a message from broadcasting agent!),topic_idTopicId(typedefault, sourceself.id.key),)BroadcastingAgent 也继承自 RoutedAgent它定义了一个消息处理方法当收到消息时它会发布一条新的消息消息内容为 “Publishing a message from broadcasting agent!”。
这条消息会发布到 default 主题且 topic_id 包含代理的 source即 self.id.key作为源。
3. 注册代理并处理消息
from autogen_core import TypeSubscriptionruntime SingleThreadedAgentRuntime()# Option 1: with type_subscription decorator
await ReceivingAgent.register(runtime, receiving_agent, lambda: ReceivingAgent(Receiving Agent))# Option 2: with TypeSubscription
await BroadcastingAgent.register(runtime, broadcasting_agent, lambda: BroadcastingAgent(Broadcasting Agent))
await runtime.add_subscription(TypeSubscription(topic_typedefault, agent_typebroadcasting_agent))# Start the runtime and publish a message.
runtime.start()
await runtime.publish_message(Message(Hello, World! From the runtime!), topic_idTopicId(typedefault, sourcedefault)
)
await runtime.stop_when_idle()runtime 创建了一个单线程的运行时环境。
选项 1通过 type_subscription 装饰器ReceivingAgent 被注册到运行时它会订阅 default 主题。
选项 2BroadcastingAgent 注册后通过 runtime.add_subscription() 将其与 default 主题的消息订阅关联起来。
runtime.start() 启动运行时并发布了一条消息 “Hello, World! From the runtime!”这条消息会被所有订阅 default 主题的代理接收。
由于 ReceivingAgent 订阅了 default 主题它会接收到来自 runtime.publish_message 的消息并输出 “Received a message: Hello, World! From the runtime!”。
之后BroadcastingAgent 发布了一条消息 “Publishing a message from broadcasting agent!”并发送到 default 主题。
4. 默认主题与订阅
from autogen_core import DefaultTopicId, default_subscriptiondefault_subscription
class BroadcastingAgentDefaultTopic(RoutedAgent):message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) - None:# Publish a message to all agents in the same namespace.await self.publish_message(Message(Publishing a message from broadcasting agent!),topic_idDefaultTopicId(),)BroadcastingAgentDefaultTopic 通过 default_subscription 装饰器订阅了一个特殊的默认主题DefaultTopicId()。
当它收到消息时它会将 “Publishing a message from broadcasting agent!” 消息发布到同一命名空间的所有代理所有订阅该主题的代理都会收到这条消息。
5. 注册代理并启动消息发布
runtime SingleThreadedAgentRuntime()
await BroadcastingAgentDefaultTopic.register(runtime, broadcasting_agent, lambda: BroadcastingAgentDefaultTopic(Broadcasting Agent)
)
await ReceivingAgent.register(runtime, receiving_agent, lambda: ReceivingAgent(Receiving Agent))
runtime.start()
await runtime.publish_message(Message(Hello, World! From the runtime!), topic_idDefaultTopicId())
await runtime.stop_when_idle()注册了 BroadcastingAgentDefaultTopic 和 ReceivingAgent 两个代理。 在运行时启动后runtime.publish_message 发布了一条消息 “Hello, World! From the runtime!” 到默认主题 DefaultTopicId()这个消息会被 ReceivingAgent 接收并打印。 随后BroadcastingAgentDefaultTopic 会发布一条消息 “Publishing a message from broadcasting agent!”并将其广播到同一个命名空间中的所有代理ReceivingAgent 收到这条消息并打印。
类似例子
from autogen_core import RoutedAgent, message_handler, type_subscription, TopicId# Agent that will receive status updates
type_subscription(topic_typestatus)
class StatusReceiverAgent(RoutedAgent):message_handlerasync def on_status_update(self, message: Message, ctx: MessageContext) - None:print(fStatus received: {message.content})# Agent that publishes status updates
type_subscription(topic_typecommand)
class StatusPublisherAgent(RoutedAgent):message_handlerasync def on_command_message(self, message: Message, ctx: MessageContext) - None:# Publish a status update to the status topicawait self.publish_message(Message(System is up and running!),topic_idTopicId(typestatus, sourceself.id.key),)# Agent runtime and agent registration
runtime SingleThreadedAgentRuntime()# Register the status receiver and publisher agents
await StatusReceiverAgent.register(runtime, status_receiver, lambda: StatusReceiverAgent(Status Receiver))
await StatusPublisherAgent.register(runtime, status_publisher, lambda: StatusPublisherAgent(Status Publisher))# Start the runtime and simulate sending a command to the publisher
runtime.start()# The publisher publishes a status update when it receives a message (e.g., send status update)
await runtime.publish_message(Message(send status update), topic_idTopicId(typecommand, sourcestatus_publisher))# The receiver will print the status update message
await runtime.stop_when_idle()
输出
Status received: System is up and running!参考链接 https://microsoft.github.io/autogen/stable/user-guide/core-user-guide/framework/message-and-communication.html