Link Search Menu Expand Document

サマリ:AWS EC2を使ってみました

Coffee Break〜AWS EC2でTwitter Streamを取得してAWS ESにデータを入れてみる〜

概要

スロットを作っています。なので、スロットっていう単語でどういうのがトレンドなのか知りたいのです。

環境

開発環境はmacです。なので、必要なライブラリはbrewとpipで入れます。そうです。言語はpythonで実装してます。と言っても、ようやく最近触り始めた俄かです。

ソースコード

なんか、いろいろ情報をあさっていると、デーモン化しておかないとプロセスが勝手に死ぬらしいので、やっつけでやっておきました。あと、ローカルではきちんとloggingは動いているのですが、awsに持って行くとファイルディスクリプタエラーで正常に記録されないので、awsではコメントアウトしてます。

循環リストにidを保存しているのは、twitter timeline(APIの要らない方)で取得していた時の名残です。


from pytz import timezone
from dateutil import parser
from datetime import datetime
from elasticsearch import Elasticsearch
from twitter import Twitter, TwitterStream, OAuth
from threading import Timer, get_ident

from daemon import daemon
from lockfile.pidlockfile import PIDLockFile

import logging
import logging.handlers
import re

#-START- log setting -----
logging.basicConfig(filename='./twitter.log', level=logging.DEBUG)
logger = logging.getLogger("twitter")    #logger名loggerを取得
logger.setLevel(logging.DEBUG)

#ハンドラを作成
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter( "TWITTER, %(asctime)s %(levelname)8s %(message)s" ))
#loggerにハンドラを設定
logger.addHandler(handler)
logger.info("START Getting tweets")
#-ENDOF- log setting -----

# https://apps.twitter.com/ でTwitterアプリを登録して取得したOAuthキーを設定。
OAUTH_INFO = dict(
    token="0000000000-abcdefghijklmnopqrstuvwxyz0123456789ABC",
    token_secret="abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHI",
    consumer_key="abcdefghijklmnopqrtstuvwx",
    consumer_secret="abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMN")

STREAM_INFO = dict(
    timeout=600,
    block=False,
    heartbeat_timeout=600) # デフォルトだと90秒でストリーム維持のためのheartbeatが飛んで来るので10分に設定

JST = timezone('Asia/Tokyo')
WOEID_JP = 23424856 # 日本のWOEID

class TwitterTrendStream():

    def __init__(self):
        self.__current_thread_ident = None
        self.__oauth = OAuth(**OAUTH_INFO)
        self.__es = Elasticsearch(host='search-abcdefg-xxxxxxxxxxxxxxxxxxxxxxxxxx.us-west-2.es.amazonaws.com', port=80)

        #-----循環参照用index
        self.__circular_list_index = 0
        #-----循環参照用list
        self.__circular_list = []

    #-----循環参照用index 更新
    def __update_index(self):
        self.__circular_list_index = (self.__circular_list_index + 1) % 10000
        return;

    #-----循環参照用list 更新
    def __update_list(self, _id):
        if len(circular_list) < 10000:
            self.__circular_list.append(_id)
        else:
            self.__circular_list[self.__circular_list_index] = _id
            self.__update_index()
        return;

    def __fetch_filter_stream(self, twitter_stream, track_list):
        track = ",".join(track_list)
        return twitter_stream.statuses.filter(track=track)

    def run(self):
        self.__current_thread_ident = get_ident() # 現在の実行スレッドIDを登録
        Timer(300, self.run).start()              # 5分後に新たなスレッドを開始

        twitter = Twitter(auth=self.__oauth)
        twitter_stream = TwitterStream(auth=self.__oauth, **STREAM_INFO)

        trend_list = ['スロット', '回胴', 'パチスロ', '#スロット', '#回胴', '#パチスロ', '#アニメ']
        tweet_iter = self.__fetch_filter_stream(twitter_stream, trend_list)

        for tweet in tweet_iter:
            if "limit" in tweet: # 取得上限超えた時にくるLimit Jsonは無視
                continue
            logger.info("{0}\t{1}".format("[success]", "lists are get"))

            if self.__current_thread_ident != get_ident(): # 新たなスレッドが立ち上がったら現在のストリームを終了させる
                return True
            for trend in trend_list:
                if trend in tweet['text']:

                    # 循環参照に含まれている場合はスキップ
                    if tweet['id'] in circular_list:
                        continue
                    # 循環参照に追加
                    self.__update_list(tweet['id'])

                    # HTMLタグ/URLを削除
                    summary = tweet['text']
                    summary = re.sub("<.*?>", "", summary)
                    summary = re.sub("https?://[\w/:%#\$&\?\(\)~\.=\+\-]+", "", summary)

                    doc = {
                        'track': trend,
                        'text': tweet['text'],
                        'tweet': summary,
                        'created_at': str(parser.parse(tweet['created_at']).astimezone(JST).isoformat()),
                        'id': tweet['id']
                    }
                    self.__es.index(index="slot", doc_type='tweet', body=doc)
                    logger.info("{0}\t{1}".format("[success]", tweet['id']))

if __name__ == '__main__':
    with daemon.DaemonContext(
            pidfile=PIDLockFile('/tmp/daemon.pid'),
            stderr=open('stderr.txt', 'w+')
            ):
        TwitterTrendStream().run()

課題

非常にノイズが多いです。解析にも響いているので、どうやって削るか検討中です。

以上。

春麗らかかと思えば寒い日が続いています。風邪をひかないよう気をつけねば。


Back to top

Copyright © 2021- Akira Otaka All rights reserved.