あけましておめでとうございます。というエントリを書こうとしてたんだけどいいネタがなくてこのままでは2月になってしまうというところでなんでもいいから書こうとなっています。
httpでストリーミング的にデータを垂れ流してくるようなサーバがあってそこから適当にデータをダウンロードしつつ処理をしたい。できればpythonで、という相談を受けたのでasyncioってやつを使ってみたのをメモっときたいと思いました。
まず無限に応答を返すようなhttpサーバを建てる必要があった。
- MacOSでは標準で搭載されているapacheを有効にしてcgiを動かした。このへんは他のWebサイトにもいくらでも載ってるので割愛である。/Library/WebServer/CGI-Executables/ にcgiスクリプトを入れたらアクセスできるようになる。
- 下記の無限に数字が出力されつづけるinfinity.plを上記ディレクトリに配置した。IO::Handleを使ってautoflush(1)をしているところがポイントで、これをやらないとバッファリングされてしまってタイミングよくクライアントに応答が送られない感じになる。
#!/opt/local/bin/perl
use CGI;
use IO::Handle;
STDOUT->autoflush(1);
my $q = new CGI;
print $q->header();
for (my $n = 0; ; $n++) {
print "$n\n";
sleep(1);
}
- ブラウザではよくわからないのでcurlで出力してみるとちゃんと1秒毎に数字が表示されて動いていた。
なんでperlなのか、pythonっていってたじゃんかと思われるかもしないが、それはこのへんは主題じゃないから。次が本番のpythonのクライアント
- aiohttpってやつを使えば非同期でhttpのリクエストを出してレスポンスを受けとれる。
import aiohttp
import asyncio
async def main() :
async with aiohttp.ClientSession() as session:
async with session.get('http://localhost/cgi-bin/test.pl') as resp:
print(resp.status)
print(await resp.text())
asyncio.run(main());
これはいろんなWebページに良く載ってるやつで、これをTaskにして複数個並列に実行させてスクレイピングをやるよみたいな例でよく出ている。でもこれだとresp.textをawaitしているので結局最後まで受信しないとテキストが出力されない。 - 無限に応答を返しつづけるようなサーバに対しては、
https://docs.aiohttp.org/en/stable/client_quickstart.html
の、「Streaming Response Content」のセクションに解説があった。下記のコードで…つまりresp.text()とかではなくresp.content.read(20)で受けるとサーバから受信したタイミングで処理ができる。
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
async with session.get('http://localhost/cgi-bin/infinity.pl') as resp:
while (1):
print(await resp.content.read(20))
asyncio.run(main())
- read(20)でやってみたんだけど、これは
https://docs.python.org/ja/3.6/library/asyncio-stream.html#asyncio.StreamReader
のreadメソッドで引数は一度に読みとるbyte数だった。read(10000)にしても動作は変わらなかったので、受信したタイミングで受信する最大バイト数を指定しており、それより小さいデータしか受信できてなくても応答が返ってくるようだった。あたりまえだがこれをread(1)にすると1バイトずつしか取ってくれなかった。 - 受信中に他の処理をしたい場合にはasyncioのcreate_taskを使う必要があるとのことだった。3秒経ったら止めるフラグを立てる下記のようなコードを作って確認した。
import aiohttp
import asyncio
cont = True
async def timeout(duration):
await asyncio.sleep(duration)
global cont
cont = False
async def connect():
global cont
async with aiohttp.ClientSession() as session:
async with session.get('http://localhost/cgi-bin/infinity.pl') as resp:
while (cont):
print(await resp.content.read(20))
async def main():
t1 = asyncio.create_task(connect())
t2 = asyncio.create_task(timeout(3))
await t1
await t2
asyncio.run(main())
- 実際のところ「3秒だけ情報を取得する」みたいなのだったらもっと他にやりかたいっぱいあるだろという気がするがいろんなやりかたを試しておきたかった。
- main()と、taskのawaitはわりと理解しづらい気がした。たとえば上のコードでは「await t2」は無くてもいい、いやむしろ無いほうが良いくらいなのだけど、そういうのはWebで出てきたサンプルをちょろちょろコピペして「やったー動いたー」ってやってるとわからない気がする。
ストリーミングを説明してくれているサイトでいいところがあるのかよくわからなかったので公式っぽいところを参考にしてみたんだけど動作するものはわりと簡単に作れた。一方でpythonのコルーチンとかスレッドのモデルは、asyncio.run(main())のところはわりと面白いなと思う一方で、いまいちまだ自分の中であまりちゃんと理解ができてない感じなのでもうちょっとがんばってドキュメント(https://docs.python.org/ja/3/library/asyncio-task.html#)とリファレンスを読んでいく必要がありそう。