最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
当前位置: 首页 - 科技 - 知识百科 - 正文

详解Python的Twisted框架中reactor事件管理器的用法

来源:动视网 责编:小采 时间:2020-11-27 14:16:49
文档

详解Python的Twisted框架中reactor事件管理器的用法

详解Python的Twisted框架中reactor事件管理器的用法:铺垫在大量的实践中,似乎我们总是通过类似的方式来使用异步编程:监听事件事件发生执行对应的回调函数回调完成(可能产生新的事件添加进监听队列)回到1,监听事件因此我们将这样的异步模式称为Reactor模式,例如在iOS开发中的Run Loop概念,实际上非常类似
推荐度:
导读详解Python的Twisted框架中reactor事件管理器的用法:铺垫在大量的实践中,似乎我们总是通过类似的方式来使用异步编程:监听事件事件发生执行对应的回调函数回调完成(可能产生新的事件添加进监听队列)回到1,监听事件因此我们将这样的异步模式称为Reactor模式,例如在iOS开发中的Run Loop概念,实际上非常类似
 铺垫
在大量的实践中,似乎我们总是通过类似的方式来使用异步编程:

监听事件

事件发生执行对应的回调函数

回调完成(可能产生新的事件添加进监听队列)

回到1,监听事件

因此我们将这样的异步模式称为Reactor模式,例如在iOS开发中的Run Loop概念,实际上非常类似于Reactor loop,主线程的Run Loop监听屏幕UI事件,一旦发生UI事件则执行对应的事件处理代码,还可以通过GCD等方式产生事件至主线程执行。

上图是boost对Reactor模式的描绘,Twisted的设计就是基于这样的Reactor模式,Twisted程序就是在等待事件、处理事件的过程中不断循环。

from twisted.internet import reactor
reactor.run()

reactor是Twisted程序中的单例对象。

reactor
reactor是事件管理器,用于注册、注销事件,运行事件循环,当事件发生时调用回调函数处理。关于reactor有下面几个结论:

Twisted的reactor只有通过调用reactor.run()来启动。

reactor循环是在其开始的进程中运行,也就是运行在主进程中。

一旦启动,就会一直运行下去。reactor就会在程序的控制下(或者具体在一个启动它的线程的控制下)。

reactor循环并不会消耗任何CPU的资源。

并不需要显式的创建reactor,只需要引入就OK了。

最后一条需要解释清楚。在Twisted中,reactor是Singleton(也就是单例模式),即在一个程序中只能有一个reactor,并且只要你引入它就相应地创建一个。上面引入的方式这是twisted默认使用的方法,当然了,twisted还有其它可以引入reactor的方法。例如,可以使用twisted.internet.pollreactor中的系统调用来poll来代替select方法。

若使用其它的reactor,需要在引入twisted.internet.reactor前安装它。下面是安装pollreactor的方法:

from twisted.internet import pollreactor
pollreactor.install()

如果你没有安装其它特殊的reactor而引入了twisted.internet.reactor,那么Twisted会根据操作系统安装默认的reactor。正因为如此,习惯性做法不要在最顶层的模块内引入reactor以避免安装默认reactor,而是在你要使用reactor的区域内安装。
下面是使用 pollreactor重写上上面的程序:

from twited.internet import pollreactor
pollreactor.install()
from twisted.internet import reactor
reactor.run()

那么reactor是如何实现单例的?来看一下from twisted.internet import reactor做了哪些事情就并明白了。

下面是twisted/internet/reactor.py的部分代码:

# twisted/internet/reactor.py
import sys
del sys.modules['twisted.internet.reactor']
from twisted.internet import default
default.install()

注:Python中所有加载到内存的模块都放在sys.modules,它是一个全局字典。当import一个模块时首先会在这个列表中查找是否已经加载了此模块,如果加载了则只是将模块的名字加入到正在调用import的模块的命名空间中。如果没有加载则从sys.path目录中按照模块名称查找模块文件,找到后将模块载入内存,并加入到sys.modules中,并将名称导入到当前的命名空间中。

假如我们是第一次运行from twisted.internet import reactor,因为sys.modules中还没有twisted.internet.reactor,所以会运行reactory.py中的代码,安装默认的reactor。之后,如果导入的话,因为sys.modules中已存在该模块,所以会直接将sys.modules中的twisted.internet.reactor导入到当前命名空间。

default中的install:

# twisted/internet/default.py
def _getInstallFunction(platform):
 """
 Return a function to install the reactor most suited for the given platform.
 
 @param platform: The platform for which to select a reactor.
 @type platform: L{twisted.python.runtime.Platform}
 
 @return: A zero-argument callable which will install the selected
 reactor.
 """
 try:
 if platform.isLinux():
 try:
 from twisted.internet.epollreactor import install
 except ImportError:
 from twisted.internet.pollreactor import install
 elif platform.getType() == 'posix' and not platform.isMacOSX():
 from twisted.internet.pollreactor import install
 else:
 from twisted.internet.selectreactor import install
 except ImportError:
 from twisted.internet.selectreactor import install
 return install
 
 
install = _getInstallFunction(platform)

很明显,default中会根据平台获取相应的install。Linux下会首先使用epollreactor,如果内核还不支持,就只能使用pollreactor。Mac平台使用pollreactor,windows使用selectreactor。每种install的实现差不多,这里我们抽取selectreactor中的install来看看。

# twisted/internet/selectreactor.py:
def install():
 """Configure the twisted mainloop to be run using the select() reactor.
 """
 # 单例
 reactor = SelectReactor()
 from twisted.internet.main import installReactor
 installReactor(reactor)
 
# twisted/internet/main.py:
def installReactor(reactor):
 """
 Install reactor C{reactor}.
 
 @param reactor: An object that provides one or more IReactor* interfaces.
 """
 # this stuff should be common to all reactors.
 import twisted.internet
 import sys
 if 'twisted.internet.reactor' in sys.modules:
 raise error.ReactorAlreadyInstalledError("reactor already installed")
 twisted.internet.reactor = reactor
 sys.modules['twisted.internet.reactor'] = reactor

在installReactor中,向sys.modules添加twisted.internet.reactor键,值就是再install中创建的单例reactor。以后要使用reactor,就会导入这个单例了。

SelectReactor
# twisted/internet/selectreactor.py
@implementer(IReactorFDSet)
class SelectReactor(posixbase.PosixReactorBase, _extraBase)

implementer表示SelectReactor实现了IReactorFDSet接口的方法,这里用到了zope.interface,它是python中的接口实现,有兴趣的同学可以去看下。

IReactorFDSet接口主要对描述符的获取、添加、删除等操作的方法。这些方法看名字就能知道意思,所以我就没有加注释。

# twisted/internet/interfaces.py
class IReactorFDSet(Interface):
 
 def addReader(reader):
 
 def addWriter(writer):
 
 def removeReader(reader):
 
 def removeWriter(writer):
 
 def removeAll():
 
 def getReaders():
 
 def getWriters():
reactor.listenTCP()

示例中的reactor.listenTCP()注册了一个监听事件,它是父类PosixReactorBase中方法。

# twisted/internet/posixbase.py
@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
 ReactorBase):
 
 def listenTCP(self, port, factory, backlog=50, interface=''):
 p = tcp.Port(port, factory, backlog, interface, self)
 p.startListening()
 return p
 
# twisted/internet/tcp.py
@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):
 def __init__(self, port, factory, backlog=50, interface='', reactor=None):
 """Initialize with a numeric port to listen on.
 """
 base.BasePort.__init__(self, reactor=reactor)
 self.port = port
 self.factory = factory
 self.backlog = backlog
 if abstract.isIPv6Address(interface):
 self.addressFamily = socket.AF_INET6
 self._addressType = address.IPv6Address
 self.interface = interface
 ...
 
 def startListening(self):
 """Create and bind my socket, and begin listening on it.
 创建并绑定套接字,开始监听。
 
 This is called on unserialization, and must be called after creating a
 server to begin listening on the specified port.
 """
 if self._preexistingSocket is None:
 # Create a new socket and make it listen
 try:
 # 创建套接字
 skt = self.createInternetSocket()
 if self.addressFamily == socket.AF_INET6:
 addr = _resolveIPv6(self.interface, self.port)
 else:
 addr = (self.interface, self.port)
 # 绑定
 skt.bind(addr)
 except socket.error as le:
 raise CannotListenError(self.interface, self.port, le)
 # 监听
 skt.listen(self.backlog)
 else:
 # Re-use the externally specified socket
 skt = self._preexistingSocket
 self._preexistingSocket = None
 # Avoid shutting it down at the end.
 self._shouldShutdown = False
 
 # Make sure that if we listened on port 0, we update that to
 # reflect what the OS actually assigned us.
 self._realPortNumber = skt.getsockname()[1]
 
 log.msg("%s starting on %s" % (
 self._getLogPrefix(self.factory), self._realPortNumber))
 
 # The order of the next 5 lines is kind of bizarre. If no one
 # can explain it, perhaps we should re-arrange them.
 self.factory.doStart()
 self.connected = True
 self.socket = skt
 self.fileno = self.socket.fileno
 self.numberAccepts = 100
 
 # startReading调用reactor的addReader方法将Port加入读集合
 self.startReading()

整个逻辑很简单,和正常的server端一样,创建套接字、绑定、监听。不同的是将套接字的描述符添加到了reactor的读集合。那么假如有了client连接过来的话,reactor会监控到,然后触发事件处理程序。

reacotr.run()事件主循环

# twisted/internet/posixbase.py
@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
 ReactorBase)
 
# twisted/internet/base.py
class _SignalReactorMixin(object):
 
 def startRunning(self, installSignalHandlers=True):
 """
 PosixReactorBase的父类_SignalReactorMixin和ReactorBase都有该函数,但是
 _SignalReactorMixin在前,安装mro顺序的话,会先调用_SignalReactorMixin中的。
 """
 self._installSignalHandlers = installSignalHandlers
 ReactorBase.startRunning(self)
 
 def run(self, installSignalHandlers=True):
 self.startRunning(installSignalHandlers=installSignalHandlers)
 self.mainLoop()
 
 def mainLoop(self):
 while self._started:
 try:
 while self._started:
 # Advance simulation time in delayed event
 # processors.
 self.runUntilCurrent()
 t2 = self.timeout()
 t = self.running and t2
 # doIteration是关键,select,poll,epool实现各有不同
 self.doIteration(t)
 except:
 log.msg("Unexpected error in main loop.")
 log.err()
 else:
 log.msg('Main loop terminated.')

mianLoop就是最终的主循环了,在循环中,调用doIteration方法监控读写描述符的集合,一旦发现有描述符准备好读写,就会调用相应的事件处理程序。

# twisted/internet/selectreactor.py
@implementer(IReactorFDSet)
class SelectReactor(posixbase.PosixReactorBase, _extraBase):
 
 def __init__(self):
 """
 Initialize file descriptor tracking dictionaries and the base class.
 """
 self._reads = set()
 self._writes = set()
 posixbase.PosixReactorBase.__init__(self)
 
 def doSelect(self, timeout):
 """
 Run one iteration of the I/O monitor loop.
 
 This will run all selectables who had input or output readiness
 waiting for them.
 """
 try:
 # 调用select方法监控读写集合,返回准备好读写的描述符
 r, w, ignored = _select(self._reads,
 self._writes,
 [], timeout)
 except ValueError:
 # Possibly a file descriptor has gone negative?
 self._preenDescriptors()
 return
 except TypeError:
 # Something *totally* invalid (object w/o fileno, non-integral
 # result) was passed
 log.err()
 self._preenDescriptors()
 return
 except (select.error, socket.error, IOError) as se:
 # select(2) encountered an error, perhaps while calling the fileno()
 # method of a socket. (Python 2.6 socket.error is an IOError
 # subclass, but on Python 2.5 and earlier it is not.)
 if se.args[0] in (0, 2):
 # windows does this if it got an empty list
 if (not self._reads) and (not self._writes):
 return
 else:
 raise
 elif se.args[0] == EINTR:
 return
 elif se.args[0] == EBADF:
 self._preenDescriptors()
 return
 else:
 # OK, I really don't know what's going on. Blow up.
 raise
 
 _drdw = self._doReadOrWrite
 _logrun = log.callWithLogger
 for selectables, method, fdset in ((r, "doRead", self._reads),
 (w,"doWrite", self._writes)):
 for selectable in selectables:
 # if this was disconnected in another thread, kill it.
 # ^^^^ --- what the !@#*? serious! -exarkun
 if selectable not in fdset:
 continue
 # This for pausing input when we're not ready for more.
 
 # 调用_doReadOrWrite方法
 _logrun(selectable, _drdw, selectable, method)
 
 doIteration = doSelect
 
 def _doReadOrWrite(self, selectable, method):
 try:
 # 调用method,doRead或者是doWrite,
 # 这里的selectable可能是我们监听的tcp.Port
 why = getattr(selectable, method)()
 except:
 why = sys.exc_info()[1]
 log.err()
 if why:
 self._disconnectSelectable(selectable, why, method=="doRead")

那么假如客户端有连接请求了,就会调用读集合中tcp.Port的doRead方法。

# twisted/internet/tcp.py
 
@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):
 
 def doRead(self):
 """Called when my socket is ready for reading.
 当套接字准备好读的时候调用
 
 This accepts a connection and calls self.protocol() to handle the
 wire-level protocol.
 """
 try:
 if platformType == "posix":
 numAccepts = self.numberAccepts
 else:
 numAccepts = 1
 for i in range(numAccepts):
 if self.disconnecting:
 return
 try:
 # 调用accept
 skt, addr = self.socket.accept()
 except socket.error as e:
 if e.args[0] in (EWOULDBLOCK, EAGAIN):
 self.numberAccepts = i
 break
 elif e.args[0] == EPERM:
 continue
 elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
 log.msg("Could not accept new connection (%s)" % (
 errorcode[e.args[0]],))
 break
 raise
 
 fdesc._setCloseOnExec(skt.fileno())
 protocol = self.factory.buildProtocol(self._buildAddr(addr))
 if protocol is None:
 skt.close()
 continue
 s = self.sessionno
 self.sessionno = s+1
 # transport初始化的过程中,会将自身假如到reactor的读集合中,那么当它准备
 # 好读的时候,就可以调用它的doRead方法读取客户端发过来的数据了
 transport = self.transport(skt, protocol, addr, self, s, self.reactor)
 protocol.makeConnection(transport)
 else:
 self.numberAccepts = self.numberAccepts+20
 except:
 log.deferr()

doRead方法中,调用accept产生了用于接收客户端数据的套接字,将套接字与transport绑定,然后把transport加入到reactor的读集合。当客户端有数据到来时,就会调用transport的doRead方法进行数据读取了。

Connection是Server(transport实例的类)的父类,它实现了doRead方法。

# twisted/internet/tcp.py
@implementer(interfaces.ITCPTransport, interfaces.ISystemHandle)
class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser,
 _AbortingMixin):
 
 def doRead(self):
 try:
 # 接收数据
 data = self.socket.recv(self.bufferSize)
 except socket.error as se:
 if se.args[0] == EWOULDBLOCK:
 return
 else:
 return main.CONNECTION_LOST
 
 return self._dataReceived(data)
 
 def _dataReceived(self, data):
 if not data:
 return main.CONNECTION_DONE
 # 调用我们自定义protocol的dataReceived方法处理数据
 rval = self.protocol.dataReceived(data)
 if rval is not None:
 offender = self.protocol.dataReceived
 warningFormat = (
 'Returning a value other than None from %(fqpn)s is '
 'deprecated since %(version)s.')
 warningString = deprecate.getDeprecationWarningString(
 offender, versions.Version('Twisted', 11, 0, 0),
 format=warningFormat)
 deprecate.warnAboutFunction(offender, warningString)
 return rval

_dataReceived中调用了示例中我们自定义的EchoProtocol的dataReceived方法处理数据。

至此,一个简单的流程,从创建监听事件,到接收客户端数据就此结束了。

更多详解Python的Twisted框架中reactor事件管理器的用法相关文章请关注PHP中文网!

文档

详解Python的Twisted框架中reactor事件管理器的用法

详解Python的Twisted框架中reactor事件管理器的用法:铺垫在大量的实践中,似乎我们总是通过类似的方式来使用异步编程:监听事件事件发生执行对应的回调函数回调完成(可能产生新的事件添加进监听队列)回到1,监听事件因此我们将这样的异步模式称为Reactor模式,例如在iOS开发中的Run Loop概念,实际上非常类似
推荐度:
标签: 使用 事件 python
  • 热门焦点

最新推荐

猜你喜欢

热门推荐

专题
Top