获取新浪微博1000w用户及最近50条微博的Python脚本
2012-09-12 09:22:47 来源:我爱运维网 评论:0 点击:
功能是:获取新浪微博1000w用户的基本信息和每个爬取用户最近发表的50条微博,使用python编写,多进程爬取,将数据存储在了mongodb中[代码][P...
功能是:获取新浪微博1000w用户的基本信息和每个爬取用户最近发表的50条微博,使用python编写,多进程爬取,将数据存储在了mongodb中
[代码] [Python]代码
001 | #!/usr/bin/python |
002 | #-*-coding:utf8-*- |
003 |
004 | from pprint import pprint |
005 | from weibopy.auth import OAuthHandler |
006 | from weibopy.api import API |
007 | from weibopy.binder import bind_api |
008 | from weibopy.error import WeibopError |
009 | import time,os,pickle,sys |
010 | import logging.config |
011 | from multiprocessing import Process |
012 | from pymongo import Connection |
013 |
014 |
015 | mongo_addr = 'localhost' |
016 | mongo_port = 27017 |
017 | db_name = 'weibo' |
018 |
019 | class Sina_reptile(): |
020 | """ |
021 | 爬取sina微博数据 |
022 | """ |
023 |
024 | def __init__(self,consumer_key,consumer_secret): |
025 | self.consumer_key,self.consumer_secret = consumer_key,consumer_secret |
026 | self.connection = Connection(mongo_addr,mongo_port) |
027 | self.db = self.connection[db_name] |
028 | self.collection_userprofile = self.db['userprofile'] |
029 | self.collection_statuses = self.db['statuses'] |
030 |
031 | def getAtt(self, key): |
032 | try: |
033 | return self.obj.__getattribute__(key) |
034 | except Exception, e: |
035 | print e |
036 | return '' |
037 |
038 | def getAttValue(self, obj, key): |
039 | try: |
040 | return obj.__getattribute__(key) |
041 | except Exception, e: |
042 | print e |
043 | return '' |
044 |
045 | def auth(self): |
046 | """ |
047 | 用于获取sina微博 access_token 和access_secret |
048 | """ |
049 | if len(self.consumer_key) == 0: |
050 | print "Please set consumer_key" |
051 | return |
052 |
053 | if len(self.consumer_key) == 0: |
054 | print "Please set consumer_secret" |
055 | return |
056 |
057 | self.auth = OAuthHandler(self.consumer_key, self.consumer_secret) |
058 | auth_url = self.auth.get_authorization_url() |
059 | print 'Please authorize: ' + auth_url |
060 | verifier = raw_input('PIN: ').strip() |
061 | self.auth.get_access_token(verifier) |
062 | self.api = API(self.auth) |
063 |
064 | def setToken(self, token, tokenSecret): |
065 | """ |
066 | 通过oauth协议以便能获取sina微博数据 |
067 | """ |
068 | self.auth = OAuthHandler(self.consumer_key, self.consumer_secret) |
069 | self.auth.setToken(token, tokenSecret) |
070 | self.api = API(self.auth) |
071 |
072 | def get_userprofile(self,id): |
073 | """ |
074 | 获取用户基本信息 |
075 | """ |
076 | try: |
077 | userprofile = {} |
078 | userprofile['id'] = id |
079 | user = self.api.get_user(id) |
080 | self.obj = user |
081 |
082 | userprofile['screen_name'] = self.getAtt("screen_name") |
083 | userprofile['name'] = self.getAtt("name") |
084 | userprofile['province'] = self.getAtt("province") |
085 | userprofile['city'] = self.getAtt("city") |
086 | userprofile['location'] = self.getAtt("location") |
087 | userprofile['description'] = self.getAtt("description") |
088 | userprofile['url'] = self.getAtt("url") |
089 | userprofile['profile_image_url'] = self.getAtt("profile_image_url") |
090 | userprofile['domain'] = self.getAtt("domain") |
091 | userprofile['gender'] = self.getAtt("gender") |
092 | userprofile['followers_count'] = self.getAtt("followers_count") |
093 | userprofile['friends_count'] = self.getAtt("friends_count") |
094 | userprofile['statuses_count'] = self.getAtt("statuses_count") |
095 | userprofile['favourites_count'] = self.getAtt("favourites_count") |
096 | userprofile['created_at'] = self.getAtt("created_at") |
097 | userprofile['following'] = self.getAtt("following") |
098 | userprofile['allow_all_act_msg'] = self.getAtt("allow_all_act_msg") |
099 | userprofile['geo_enabled'] = self.getAtt("geo_enabled") |
100 | userprofile['verified'] = self.getAtt("verified") |
101 |
102 | # for i in userprofile: |
103 | # print type(i),type(userprofile[i]) |
104 | # print i,userprofile[i] |
105 | # |
106 |
107 | except WeibopError, e: #捕获到的WeibopError错误的详细原因会被放置在对象e中 |
108 | print "error occured when access userprofile use user_id:",id |
109 | print "Error:",e |
110 | log.error("Error occured when access userprofile use user_id:{0}\nError:{1}".format(id, e),exc_info=sys.exc_info()) |
111 | return None |
112 |
113 | return userprofile |
114 |
115 | def get_specific_weibo(self,id): |
116 | """ |
117 | 获取用户最近发表的50条微博 |
118 | """ |
119 | statusprofile = {} |
120 | statusprofile['id'] = id |
121 | try: |
122 | #重新绑定get_status函数 |
123 | get_status = bind_api( path = '/statuses/show/{id}.json', |
124 | payload_type = 'status', |
125 | allowed_param = ['id']) |
126 | except: |
127 | return "**绑定错误**" |
128 | status = get_status(self.api,id) |
129 | self.obj = status |
130 | statusprofile['created_at'] = self.getAtt("created_at") |
131 | statusprofile['text'] = self.getAtt("text") |
132 | statusprofile['source'] = self.getAtt("source") |
133 | statusprofile['favorited'] = self.getAtt("favorited") |
134 | statusprofile['truncated'] = self.getAtt("ntruncatedame") |
135 | statusprofile['in_reply_to_status_id'] =self.getAtt("in_reply_to_status_id") |
136 | statusprofile['in_reply_to_user_id'] =self.getAtt("in_reply_to_user_id") |
137 | statusprofile['in_reply_to_screen_name'] =self.getAtt("in_reply_to_screen_name") |
138 | statusprofile['thumbnail_pic'] = self.getAtt("thumbnail_pic") |
139 | statusprofile['bmiddle_pic'] = self.getAtt("bmiddle_pic") |
140 | statusprofile['original_pic'] = self.getAtt("original_pic") |
141 | statusprofile['geo'] = self.getAtt("geo") |
142 | statusprofile['mid'] = self.getAtt("mid") |
143 | statusprofile['retweeted_status'] = self.getAtt("retweeted_status") |
144 | return statusprofile |
145 |
146 | def get_latest_weibo(self,user_id,count): |
147 | """ |
148 | 获取用户最新发表的count条数据 |
149 | """ |
150 | statuses,statusprofile = [],{} |
151 | try: #error occur in the SDK |
152 | timeline = self.api.user_timeline(count=count, user_id=user_id) |
153 | except Exception as e: |
154 | print "error occured when access status use user_id:",user_id |
155 | print "Error:",e |
156 | log.error("Error occured when access status use user_id:{0}\nError:{1}".format(user_id, e),exc_info=sys.exc_info()) |
157 | return None |
158 | for line in timeline: |
159 | self.obj = line |
160 | statusprofile['usr_id'] = user_id |
161 | statusprofile['id'] = self.getAtt("id") |
162 | statusprofile['created_at'] = self.getAtt("created_at") |
163 | statusprofile['text'] = self.getAtt("text") |
164 | statusprofile['source'] = self.getAtt("source") |
165 | statusprofile['favorited'] = self.getAtt("favorited") |
166 | statusprofile['truncated'] = self.getAtt("ntruncatedame") |
167 | statusprofile['in_reply_to_status_id'] =self.getAtt("in_reply_to_status_id") |
168 | statusprofile['in_reply_to_user_id'] =self.getAtt("in_reply_to_user_id") |
169 | statusprofile['in_reply_to_screen_name'] =self.getAtt("in_reply_to_screen_name") |
170 | statusprofile['thumbnail_pic'] = self.getAtt("thumbnail_pic") |
171 | statusprofile['bmiddle_pic'] = self.getAtt("bmiddle_pic") |
172 | statusprofile['original_pic'] = self.getAtt("original_pic") |
173 | statusprofile['geo'] =repr(pickle.dumps(self.getAtt("geo"),pickle.HIGHEST_PROTOCOL)) |
174 | statusprofile['mid'] = self.getAtt("mid") |
175 | statusprofile['retweeted_status'] =repr(pickle.dumps(self.getAtt("retweeted_status"),pickle.HIGHEST_PROTOCOL)) |
176 | statuses.append(statusprofile) |
177 |
178 | return statuses |
179 |
180 | def friends_ids(self,id): |
181 | """ |
182 | 获取用户关注列表id |
183 | """ |
184 | next_cursor,cursor = 1,0 |
185 | ids = [] |
186 | while(0!=next_cursor): |
187 | fids = self.api.friends_ids(user_id=id,cursor=cursor) |
188 | self.obj = fids |
189 | ids.extend(self.getAtt("ids")) |
190 | cursor = next_cursor = self.getAtt("next_cursor") |
191 | previous_cursor = self.getAtt("previous_cursor") |
192 | return ids |
193 |
194 | def manage_access(self): |
195 | """ |
196 | 管理应用访问API速度,适时进行沉睡 |
197 | """ |
198 | info = self.api.rate_limit_status() |
199 | self.obj = info |
200 | sleep_time = round( (float)(self.getAtt("reset_time_in_seconds"))/self.getAtt("remaining_hits"),2 ) ifself.getAtt("remaining_hits") else self.getAtt("reset_time_in_seconds") |
201 | printself.getAtt("remaining_hits"),self.getAtt("reset_time_in_seconds"),self.getAtt("hourly_limit"),self.getAtt("reset_time") |
202 | print "sleep time:",sleep_time,'pid:',os.getpid() |
203 | time.sleep(sleep_time + 1.5) |
204 |
205 | def save_data(self,userprofile,statuses): |
206 | self.collection_statuses.insert(statuses) |
207 | self.collection_userprofile.insert(userprofile) |
208 |
209 | def reptile(sina_reptile,userid): |
210 | ids_num,ids,new_ids,return_ids = 1,[userid],[userid],[] |
211 | while(ids_num <= 10000000): |
212 | next_ids = [] |
213 | for id in new_ids: |
214 | try: |
215 | sina_reptile.manage_access() |
216 | return_ids = sina_reptile.friends_ids(id) |
217 | ids.extend(return_ids) |
218 | userprofile = sina_reptile.get_userprofile(id) |
219 | statuses = sina_reptile.get_latest_weibo(count=50, user_id=id) |
220 | if statuses is None or userprofile is None: |
221 | continue |
222 | sina_reptile.save_data(userprofile,statuses) |
223 | except Exception as e: |
224 | log.error("Error occured in reptile,id:{0}\nError:{1}".format(id, e),exc_info=sys.exc_info()) |
225 | time.sleep(60) |
226 | continue |
227 | ids_num+=1 |
228 | print ids_num |
229 | if(ids_num >= 10000000):break |
230 | next_ids.extend(return_ids) |
231 | next_ids,new_ids = new_ids,next_ids |
232 |
233 | def run_crawler(consumer_key,consumer_secret,key,secret,userid): |
234 | try: |
235 | sina_reptile = Sina_reptile(consumer_key,consumer_secret) |
236 | sina_reptile.setToken(key, secret) |
237 | reptile(sina_reptile,userid) |
238 | sina_reptile.connection.close() |
239 | except Exception as e: |
240 | print e |
241 | log.error("Error occured in run_crawler,pid:{1}\nError:{2}".format(os.getpid(), e),exc_info=sys.exc_info()) |
242 |
243 | if __name__ == "__main__": |
244 | logging.config.fileConfig("logging.conf") |
245 | log = logging.getLogger('logger_sina_reptile') |
246 | with open('test.txt') as f: |
247 | for i in f.readlines(): |
248 | j = i.strip().split(' ') |
249 | p = Process(target=run_crawler, args=(j[0],j[1],j[2],j[3],j[4])) |
250 | p.start() |
上一篇:用python程序连接hive
下一篇:数据需求统计常用shell命令
分享到:
收藏
评论排行
- ·Windows(Win7)下用Xming...(92)
- ·使用jmx client监控activemq(20)
- ·Hive查询OOM分析(14)
- ·复杂网络架构导致的诡异...(8)
- ·使用 OpenStack 实现云...(7)
- ·影响Java EE性能的十大问题(6)
- ·云计算平台管理的三大利...(6)
- ·Mysql数据库复制延时分析(5)
- ·OpenStack Nova开发与测...(4)
- ·LTPP一键安装包1.2 发布(4)
- ·Linux下系统或服务排障的...(4)
- ·PHP发布5.4.4 和 5.3.1...(4)
- ·RSYSLOG搭建集中日志管理服务(4)
- ·转换程序源码的编码格式[...(3)
- ·Linux 的木马程式 Wirenet 出现(3)
- ·Nginx 发布1.2.1稳定版...(3)
- ·zend framework文件读取漏洞分析(3)
- ·Percona Playback 0.3 development release(3)
- ·运维业务与CMDB集成关系一例(3)
- ·应该知道的Linux技巧(3)