このBlogは移転しました。今後は aish.dev を御覧ください。

Twisted で reactor のループをブロックせずにクライアントにファイルを転送するツールを書いてみた。

Twisted にはファイル転送のための FileSender があるが、reactor のループ中にファイルの読み込みを行うため、その間、他のリクエストを処理できずに応答性が低下してしまうケースがある。以下の NonblockingFileSender では ファイルの読み込みを別スレッドで行うため、ファイルIOを行いつつちょっとしたリクエストにも応答できるようになっている。また、読み込んだデータをクライアントに送出した後、データの送信を待たずに次に送信する分のデータを自動的に読み込むため、効率的にファイルの転送が行えるようになっている。

from zope.interface import implements
from twisted.internet import threads, defer, reactor, interfaces, protocol
from twisted.protocols import basic

class NonblockingSender:
    implements(interfaces.IProducer)
    
    deferred = None
    _loading = False
    _waiting = False
    _buf = None
    
    def beginFileTransfer(self, consumer):
        self.consumer = consumer

        self.deferred = deferred = defer.Deferred()
        self.consumer.registerProducer(self, False)
        return deferred

    def _loadData(self):
        if self._loading:
            return
        if not self.deferred:
            return

        self._loading = True
        d = self.readbytes(self.CHUNK_SIZE)

        def dataarrived(chunk):
            self._loading = False
            
            if chunk is None:
                self.consumer.unregisterProducer()
                if self.deferred:
                    self.deferred.callback(True)
                    self.deferred = None
            else:
                assert self._buf is None
                self._buf = chunk
                if self._waiting:
                    self.resumeProducing()
            
        def onerr(reason):
            self._loading = False
            if self.deferred:
                self.deferred.errback(reason)
                self.deferred = None
            self.consumer.unregisterProducer()
            
        d.addCallback(dataarrived)
        d.addErrback(onerr)
        
        return d
        
    def resumeProducing(self):
        if self._buf:
            self.consumer.write(self._buf)
            self._buf = None
            self._waiting = False
        else:
            self._waiting = True

        self._loadData()

    def pauseProducing(self):
        pass

    def stopProducing(self):
        if self.deferred:
            self.deferred.errback(Exception("Consumer asked us to stop producing"))
            self.deferred = None

class NonblockingFileSender(NonblockingSender):
    CHUNK_SIZE = 1024*1024 # 1 MBytes

    def __init__(self, filename, chunksize=None):
        self._file = open(filename, "rb")
        if chunksize: self.CHUNK_SIZE = chunksize
        
    def readbytes(self, n):
        def readfile():
            ret = self._file.read(n)
            if not ret:
                return None
            return ret
        
        return threads.deferToThread(readfile)

def run():
    class NBProtocol(basic.LineReceiver):
        def lineReceived(self, n):
            s = NonblockingFileSender(n, chunksize=1024*1024)
            d = s.beginFileTransfer(self.transport)
            
            d.addCallback(self._finished)
            d.addErrback(self._err)
        
        def _finished(self, result):
            self.transport.loseConnection()

        def _err(self, reason):
            print "Failed", reason
            self.transport.loseConnection()
            
            
    class NBFactory(protocol.ServerFactory):
        protocol = NBProtocol

    reactor.listenTCP(10000, NBFactory())

run()

というのを書いたので使ってみたが、あんまり効果を体感できなかった。一応、高速化はされるけど誤差に毛が生えた程度 :-( 。手元で用意できる環境ではこういったファイル転送のボトルネックはネットワーク部分であり、ここが飽和していてはサーバで何を工夫しても当然無駄である。ディスクの転送速度 << ネットワーク転送速度な環境であれば結構な効果があるとは思うが、その辺に転がっていた中古PCをちょっとサーバにしてあいてるHubに刺してあるような環境ではあんまり意味がない。

ネットワークの転送速度を無視できるように、localhostにクライアントとサーバを両方実行してテストすると、Twisted の FileSender に比べて転送速度・応答速度ともに大きく改善された。どのくらい改善されるかはファイルのサイズやファイル読み込みのブロックサイズ、並列実行されるリクエストの数によって大きく異なるが、おおむね1.3〜10倍程度にはなるようだ。

ということで、正直、それなりに本格的なネットワークアプリケーション以外ではわざわざNon-blockingなファイル転送とか考えず、 basic.FileSender を使っておくのが適切だと思う。ディスクもプラットフォームのOSもファイルシステムもどんどん賢くなっていて、単純なシーケンシャルリードぐらいではそれほど時間はかからなくなっているようだ。こんなチューニングをしている暇があったら、賢いストレージの購入予算をひねり出す算段でもしていた方が有益かも知れない。