neutron中使用RabbitMQ基本上都是RPC的方式,分几个部分来学习下neutron中rpc的应用。
这一部分,分析neutron server端关于rpc的处理,core plugin使用linux bridge plugin,rpc_backend为kombu。
neutron server
I版本代码,在neutron server启动时,除了启动api service外,还会启动rpc service,这个服务会去调用core plugin的start_rpc_listener函数。
默认情况下,core plugin(如linux bridge,ovs)会在初始化时建立rpc;
而在ml2的代码中,除了_setup_rpc函数外,还会有start_rpc_listener函数。
core plugin
以linux bridge plugin为例,LinuxBridgePluginV2在初始化时,_setup_rpc方法用于建立rpc。
def _setup_rpc(self):
# RPC support
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc.create_connection(new=True)
self.callbacks = LinuxBridgeRpcCallbacks()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
self.notifier = AgentNotifierApi(topics.AGENT)
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
)
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
l3_rpc_agent_api.L3AgentNotify
)
处理流程:
1.初始化service_topics,q-plugin和q-l3-plugin。
q-plugin队列主要用于rpc消息从l2 agent、dhcp agent到plugin。
q-l3-plugin队列用于rpc消息从l3 agent到plugin。
2.create_connection方法建立rpc连接。使用kombu的create_connection方法:
def create_connection(conf, new=True):
"""Create a connection."""
return rpc_amqp.create_connection(
conf, new,
rpc_amqp.get_connection_pool(conf, Connection))
amqp的get_connection_pool方法获取一个连接池,再去调用amqp的create_connection方法,返回一个ConnectionContext对象。
3.声明callbacks对象,即LinuxBridgeRpcCallbacks的实例,用于对消息的处理。
LinuxBridgeRpcCallbacks继承了下面3个类:
class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin
)
4.在callbacks中创建rpc dispatcher。返回一个PluginRpcDispatcher对象。
最后会返回rpc.dispatcher的dispatch方法,供后面的_process_data方法调用。
这个dispatcher可以理解为proxy object,proxy object是什么意思呢?
5.根据service_topic和dispatcher,创建2个consumer。
def create_consumer(self, topic, proxy, fanout= False):
self.connection.create_consumer(topic, proxy, fanout)
调用impl_kombu的create_consumer方法,创建consumer:
def create_consumer(self, topic, proxy, fanout= False):
"""Create a consumer that calls a method in a proxy object."""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool( self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
if fanout:
self.declare_fanout_consumer(topic, proxy_cb)
else:
self.declare_topic_consumer(topic, proxy_cb)
proxy_cb是ProxyCallback类的对象,proxy_cb就是callback。
类ProxyCallback:
__call__方法读取message中的method,args等参数,然后建立一个新的绿色线程调用_process_data方法。
class ProxyCallback(_ThreadPoolWithWait):
"""Calls methods on a proxy object based on method and args."""
def __init__(self, conf, proxy, connection_pool):
super(ProxyCallback, self).__init__(
conf=conf,
connection_pool=connection_pool,
)
self.proxy = proxy
self.msg_id_cache = _MsgIdCache()
def __call__(self, message_data):
"""Consumer callback to call a method on a proxy object.
Parses the message for validity and fires off a thread to call the
proxy object method.
Message data should be a dictionary with two keys:
method: string representing the method to call
args: dictionary of arg: value
Example: {'method': 'echo', ' args': {'value': 42}}
"""
# It is important to clear the context here, because at this point
# the previous context is stored in local.store.context
if hasattr(local.store, 'context'):
del local.store.context
rpc_common._safe_log(LOG.debug, _( 'received %s'), message_data)
self.msg_id_cache.check_duplicate_message(message_data)
ctxt = unpack_context( self.conf, message_data)
method = message_data.get( 'method')
args = message_data.get( 'args ', {})
version = message_data.get( 'version')
namespace = message_data.get( 'namespace ')
if not method:
LOG.warn(_( 'no method for message: %s') % message_data)
ctxt.reply(_( 'No method for message: %s') % message_data,
connection_pool= self.connection_pool)
return
self.pool.spawn_n(self._process_data, ctxt, version, method,
namespace, args)
_process_data方法:
_process_data方法依据method处理message,调用之前传入的proxy(即dispatcher)的dispatch方法。
def _process_data(self, ctxt, version, method, namespace, args):
"""Process a message in a new thread.
If the proxy object we have has a dispatch method
(see rpc.dispatcher.RpcDispatcher), pass it the version,
method, and args and let it dispatch as appropriate. If not, use
the old behavior of magically calling the specified method on the
proxy we have here.
"""
ctxt.update_store()
try:
rval = self.proxy.dispatch(ctxt, version, method, namespace,
**args)
# Check if the result was a generator
if inspect.isgenerator(rval):
for x in rval:
ctxt.reply(x, None, connection_pool=self.connection_pool)
else:
ctxt.reply(rval, None, connection_pool=self.connection_pool)
# This final None tells multicall that it is done.
ctxt.reply(ending= True, connection_pool=self.connection_pool)
except rpc_common.ClientException as e:
LOG.debug(_( 'Expected exception during message handling (%s)') %
e._exc_info[ 1])
ctxt.reply( None, e._exc_info,
connection_pool= self.connection_pool,
log_failure= False)
except Exception:
# sys.exc_info() is deleted by LOG.exception().
exc_info = sys.exc_info()
LOG.error(_( 'Exception during message handling' ),
exc_info=exc_info)
ctxt.reply( None, exc_info, connection_pool=self.connection_pool)
定义了2个topic consumer,定义consumer的类型,队列名,exchange名,并把建立好的consumer加入到consumers列表中:
def declare_topic_consumer(self, topic, callback= None, queue_name=None,
exchange_name= None, ack_on_error=True):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
exchange_name=exchange_name,
ack_on_error=ack_on_error,
),
topic, callback)
declare_consumer函数:
def declare_consumer(self, consumer_cls, topic, callback):
"""Create a Consumer using the class that was passed in and
add it to our list of consumers
"""
def _connect_error(exc):
log_info = { 'topic': topic, 'err_str': str(exc)}
LOG.error(_( "Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s" ) % log_info)
def _declare_consumer():
consumer = consumer_cls( self.conf, self .channel, topic, callback,
six.next( self.consumer_num))
self.consumers.append(consumer)
return consumer
return self.ensure(_connect_error, _declare_consumer)
TopicConsumer类:
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'."""
def __init__(self, conf, channel, topic, callback, tag, name=None,
exchange_name= None, **kwargs):
"""Init a 'topic' queue.
:param channel: the amqp channel to use
:param topic: the topic to listen on
:paramtype topic: str
:param callback: the callback to call when messages are received
:param tag: a unique ID for the consumer on the channel
:param name: optional queue name, defaults to topic
:paramtype name: str
Other kombu options may be passed as keyword arguments
"""
# Default options
options = { 'durable': conf.amqp_durable_queues,
'queue_arguments': _get_queue_arguments(conf),
'auto_delete': conf.amqp_auto_delete,
'exclusive': False}
options.update(kwargs)
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
exchange = kombu.entity.Exchange(name=exchange_name,
type= 'topic',
durable=options['durable'],
auto_delete=options['auto_delete' ])
super(TopicConsumer, self).__init__(channel,
callback,
tag,
name=name or topic,
exchange=exchange,
routing_key=topic,
**options)
6.consume_in_thread方法,消费消息。
def consume_in_thread(self):
return self.connection.consume_in_thread()
调用kombu的consume_in_thread方法:
启动一个绿色线程来运行方法_consumer_thread,进而调用consume。
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread."""
@excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()
except greenlet.GreenletExit:
return
if self.consumer_thread is None :
self.consumer_thread = eventlet.spawn(_consumer_thread)
return self.consumer_thread
然后调用Connection类的consume方法。
def consume(self, limit= None):
"""Consume from all queues/consumers."""
it = self.iterconsume(limit=limit)
while True:
try:
six.next(it)
except StopIteration:
return
调用Connection类的iterconsume方法:
在这个方法里,_consume方法会根据队列类型调用不同的consume方法。
def iterconsume(self, limit= None, timeout= None):
"""Return an iterator that will consume from all queues/consumers."""
info = { 'do_consume': True}
def _error_callback(exc):
if isinstance(exc, socket.timeout):
LOG.debug(_( 'Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_( 'Failed to consume message from queue: %s') %
str(exc))
info[ 'do_consume'] = True
def _consume():
if info[ 'do_consume']:
queues_head = self.consumers[:-1] # not fanout.
queues_tail = self.consumers[-1] # fanout
for queue in queues_head:
queue.consume(nowait= True)
queues_tail.consume(nowait= False)
info[ 'do_consume'] = False
return self.connection.drain_events(timeout=timeout)
for iteration in itertools.count(0):
if limit and iteration >= limit:
raise StopIteration
yield self.ensure(_error_callback, _consume)
在_consume这个方法里会去调用TopicConsumer的父类ConsumerBase的consume方法,返回connection的drain_events方法,处理所有channel上的events。
ConsumerBase的consume方法:
def consume(self, *args, **kwargs):
"""Actually declare the consumer on the amqp channel. This will
start the flow of messages from the queue. Using the
Connection.iterconsume() iterator will process the messages,
calling the appropriate callback.
If a callback is specified in kwargs, use that. Otherwise,
use the callback passed during __init__()
If kwargs['nowait'] is True, then this call will block until
a message is read.
"""
options = { 'consumer_tag': self.tag}
options[ 'nowait '] = kwargs.get( 'nowait ', False)
callback = kwargs.get( 'callback', self.callback)
if not callback:
raise ValueError( "No callback defined")
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
self._callback_handler(message, callback)
self.queue.consume(*args, callback=_callback, **options)
定义了一个注册函数callback,在_callback方法中,将原始消息转换为python格式的;
在_callback_handler方法中,进行消息处理,并返回ack。
def _callback_handler(self, message, callback):
"""Call callback with deserialized message.
Messages that are processed without exception are ack'ed.
If the message processing generates an exception, it will be
ack'ed if ack_on_error=True. Otherwise it will be .requeue()'ed.
"""
try:
msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
except Exception:
if self.ack_on_error:
LOG.exception(_("Failed to process message"
" ... skipping it."))
message.ack()
else:
LOG.exception(_("Failed to process message"
" ... will requeue."))
message.requeue()
else:
message.ack()
callback(msg)函数比较重要,在这里会根据参数传递过来的callback,会回到之前class ProxyCallback,调用__call__方法,进行消息的处理。
这样整个consumer的过程就建立完成了。
7.初始化notifier,类AgentNotifierApi的一个对象,用于向l2 agent发送rpc消息。topics是q-agent-notifier。
有3种消息,network-delete和port-update,还有一个是父类的,和安全组相关,security_group-update。
在这里exchanges有3种:
q-agent-notifier-network-delete_fanout,q-agent-notifier-port-update_fanout,q-agent-notifier-security_group-update_fanout。
因此对应的队列也有3种:
$ sudo rabbitmqctl list_queues name |grep q-agent-notifier
q-agent-notifier-network-delete_fanout_1a56043909134873b21a21b5bb811e7e
q-agent-notifier-port-update_fanout_b8069f4d79e949f584036fd3957f5dba
q-agent-notifier-security_group-update_fanout_c81f76cb3a664f24b2878e5b9a773500
network delete和port update时,会发送fanout消息。
当安全组的rule,member,provider更新时,都会发送fanout消息。
8.初始化dhcp agent notifier和l3 agent notifier。
用于通知dhcp agent和l3 agent,topics为dhcp_agent和l3_agent。
在DhcpAgentNotifyAPI和L3AgentNotify中,会有两种消息发送的方法,cast和fanout_cast。
cast方法将消息发送到TopicPublisher,会通知指定的host。exchange为neutron,queue为dhcp_agent.host或者l3_agent.host。
fanout_cast方法则会发送消息到FanoutPublisher,通知所有的dhcp agent或者l3 agent。exchange为dhcp_agent_fanout或l3_agent_fanout;queue则是dhcp_agent_fanout_uuid或l3_agent_fanout_uuid。