Python实现获取微信企业号access_token的Class

简介:

    微信公众号共有三种,服务号、订阅号、企业号。它们在获取AccessToken上各有不同。其中订阅号比较坑,它的AccessToken是需定时刷新,重复获取将导致上次获取的AccessToken失效。而企业号就比较好,AccessToken有效期同样为7200秒,但有效期内重复获取返回相同结果。为兼容这两种方式,因此按照订阅号的方式处理。

    处理办法与接口文档中的要求相同:

    为了保密appsecrect,第三方需要一个access_token获取和刷新的中控服务器。而其他业务逻辑服务器所使用的access_token均来自于该中控服务器,不应该各自去刷新,否则会造成access_token覆盖而影响业务。

    下面的代码以企业号为例,将access_token储存在sqlite3数据库中,相比储存在文本中,放在数据库里,可以为后期存放其他数据提供向后兼容。如果放在文本中,则不如放在数据库中灵活。

    设计思路和使用方法:

  1. 自动创建sqlite3数据库,包括表结构和数据,并能在数据库表结构不存在或者数据不存在或遭删除的情况下,创建新的可用的数据

  2. 尽可能的保证Class中每一个可执行的函数单独调用都能成功。

  3. Class中只将真正能被用到的方法和变量设置为public的。

  4. 使用时只需要修改此文件中的weixin_qy_CorpID和weixin_qy_Secret改成自己的,并import此文件,使用WeiXinTokenClass().get()方法即可得到access_token。

脚本内容可以从github上获取,地址:https://github.com/DingGuodong/LinuxBashShellScriptForOps/blob/master/projects/WeChatOps/OpsDevBestPractice/odbp_getToken.py

脚本内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
#!/usr/bin/python
# encoding: utf-8
# -*- coding: utf8 -*-
"""
Created by PyCharm.
File:               LinuxBashShellScriptForOps:odbp_getToken.py
User:               Guodong
Create Date:        2016/8/10
Create Time:        17:04
  """
 
import  os
import  sqlite3
import  sys
import  urllib
import  urllib2
import  json
import  datetime
 
# import time
 
enable_debug  =  True
 
 
def  debug(msg, code = None ):
     if  enable_debug:
         if  code  is  None :
             print  "message: %s"  %  msg
         else :
             print  "message: %s, code: %s "  %  (msg, code)
 
 
AUTHOR_MAIL  =  "uberurey_ups@163.com"
 
weixin_qy_CorpID  =  "your_corpid"
weixin_qy_Secret  =  "your_secret"
 
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR  =  os.path.dirname(os.path.abspath(__file__))
 
# Database
# https://docs.djangoproject.com/en/1.9/ref/settings/#databases
 
DATABASES  =  {
     'default' : {
         'ENGINE' 'db.backends.sqlite3' ,
         'NAME' : os.path.join(BASE_DIR,  '.odbp_db.sqlite3' ),
     }
}
 
sqlite3_db_file  =  str (DATABASES[ 'default' ][ 'NAME' ])
 
 
def  sqlite3_conn(database):
     try :
         conn  =  sqlite3.connect(database)
     except  sqlite3.Error:
         print  >> sys.stderr,  """\
     There was a problem connecting to Database:
 
         %s
 
     The error leading to this problem was:
 
         %s
 
     It's possible that this database is broken or permission denied.
 
     If you cannot solve this problem yourself, please mail to:
 
         %s
 
     """  %  (database, sys.exc_value, AUTHOR_MAIL)
         sys.exit( 1 )
     else :
         return  conn
 
 
def  sqlite3_commit(conn):
     return  conn.commit()
 
 
def  sqlite3_close(conn):
     return  conn.close()
 
 
def  sqlite3_execute(database, sql):
     try :
         sql_conn  =  sqlite3_conn(database)
         sql_cursor  =  sql_conn.cursor()
         sql_cursor.execute(sql)
         sql_conn.commit()
         sql_conn.close()
     except  sqlite3.Error as e:
         print  e
         sys.exit( 1 )
 
 
def  sqlite3_create_table_token():
     sql_conn  =  sqlite3_conn(sqlite3_db_file)
     sql_cursor  =  sql_conn.cursor()
     sql_cursor.execute( '''CREATE TABLE "main"."weixin_token" (
                 "id"  INTEGER ,
                 "access_token"  TEXT,
                 "expires_in"  TEXT,
                 "expires_on"  TEXT,
                 "is_expired"  INTEGER
                 )
                 ;
     ''' )
     sqlite3_commit(sql_conn)
     sqlite3_close(sql_conn)
 
 
def  sqlite3_create_table_account():
     sql_conn  =  sqlite3_conn(sqlite3_db_file)
     sql_cursor  =  sql_conn.cursor()
     sql_cursor.execute( '''CREATE TABLE "main"."weixin_account" (
                 "id"  INTEGER,
                 "name"  TEXT,
                 "corpid"  TEXT,
                 "secret"  TEXT,
                 "current"  INTEGER
                 )
                 ;
     ''' )
     sqlite3_commit(sql_conn)
     sqlite3_close(sql_conn)
 
 
def  sqlite3_create_tables():
     print  "sqlite3_create_tables"
     sql_conn  =  sqlite3_conn(sqlite3_db_file)
     sql_cursor  =  sql_conn.cursor()
     sql_cursor.execute( '''CREATE TABLE "main"."weixin_token" (
                 "id"  INTEGER ,
                 "access_token"  TEXT,
                 "expires_in"  TEXT,
                 "expires_on"  TEXT,
                 "is_expired"  INTEGER
                 )
                 ;
     ''' )
     sql_cursor.execute( '''CREATE TABLE "main"."weixin_account" (
                 "id"  INTEGER,
                 "name"  TEXT,
                 "corpid"  TEXT,
                 "secret"  TEXT,
                 "current"  INTEGER
                 )
                 ;
     ''' )
     sqlite3_commit(sql_conn)
     sqlite3_close(sql_conn)
 
 
def  sqlite3_set_credential(corpid, secret):
     try :
         sql_conn  =  sqlite3_conn(sqlite3_db_file)
         sql_cursor  =  sql_conn.cursor()
         sql_cursor.execute( '''INSERT INTO "weixin_account" ("id", "name", "corpid", "secret", "current") VALUES
                                 (1,
                                 'odbp',
                                 ?,
                                 ?,
                                 1)
''' , (corpid, secret))
         sqlite3_commit(sql_conn)
         sqlite3_close(sql_conn)
     except  sqlite3.Error:
         sqlite3_create_table_account()
         sqlite3_set_credential(corpid, secret)
 
 
def  sqlite3_set_token(access_token, expires_in, expires_on, is_expired):
     try :
         sql_conn  =  sqlite3_conn(sqlite3_db_file)
         sql_cursor  =  sql_conn.cursor()
         sql_cursor.execute( '''INSERT INTO "weixin_token"
                               ("id", "access_token", "expires_in", "expires_on", "is_expired") VALUES
                               (
                               1,
                               ?,
                               ?,
                               ?,
                               ?
                               )
''' , (access_token, expires_in, expires_on, is_expired))
         sqlite3_commit(sql_conn)
         sqlite3_close(sql_conn)
     except  sqlite3.Error:
         sqlite3_create_table_token()
         sqlite3_set_token(access_token, expires_in, expires_on, is_expired)
 
 
def  sqlite3_get_credential():
     try :
         sql_conn  =  sqlite3_conn(sqlite3_db_file)
         sql_cursor  =  sql_conn.cursor()
         credential  =  sql_cursor.execute( '''SELECT "corpid", "secret"  FROM weixin_account WHERE current == 1;''' )
         result  =  credential.fetchall()
         sqlite3_close(sql_conn)
     except  sqlite3.Error:
         sqlite3_set_credential(weixin_qy_CorpID, weixin_qy_Secret)
         return  sqlite3_get_credential()
     else :
         if  result  is  not  None  and  len (result) ! =  0 :
             return  result
         else :
             print  "unrecoverable problem, please alter to %s"  %  AUTHOR_MAIL
             sys.exit( 1 )
 
 
def  sqlite3_get_token():
     try :
         sql_conn  =  sqlite3_conn(sqlite3_db_file)
         sql_cursor  =  sql_conn.cursor()
         credential  =  sql_cursor.execute(
             '''SELECT "access_token", "expires_on" FROM weixin_token WHERE "is_expired" == 1 ;''' )
         result  =  credential.fetchall()
         sqlite3_close(sql_conn)
     except  sqlite3.Error:
         info  =  sys.exc_info()
         print  info[ 0 ],  ":" , info[ 1 ]
     else :
         if  result  is  not  None  and  len (result) ! =  0 :
             return  result
         else :
             # print "unrecoverable problem, please alter to %s" % AUTHOR_MAIL
             # sys.exit(1)
             return  None
 
 
def  sqlite3_update_token(access_token, expires_on):
     sql_conn  =  sqlite3_conn(sqlite3_db_file)
     sql_cursor  =  sql_conn.cursor()
     sql_cursor.execute( '''UPDATE "weixin_token" SET
                           access_token=?,
                           expires_on=?
                           WHERE _ROWID_ = 1;''' , (access_token, expires_on)
                        )
     sqlite3_commit(sql_conn)
     sqlite3_close(sql_conn)
 
 
class  WeiXinTokenClass( object ):
     def  __init__( self ):
         self .__corpid  =  None
         self .__corpsecret  =  None
         self .__use_persistence  =  True
 
         self .__access_token  =  None
         self .__expires_in  =  None
         self .__expires_on  =  None
         self .__is_expired  =  None
 
         if  self .__use_persistence:
             self .__corpid  =  sqlite3_get_credential()[ 0 ][ 0 ]
             self .__corpsecret  =  sqlite3_get_credential()[ 0 ][ 1 ]
         else :
             self .__corpid  =  weixin_qy_CorpID
             self .__corpsecret  =  weixin_qy_Secret
 
     def  __get_token_from_weixin_qy_api( self ):
         parameters  =  {
             "corpid" self .__corpid,
             "corpsecret" self .__corpsecret
         }
         url_parameters  =  urllib.urlencode(parameters)
         token_url  =  "https://qyapi.weixin.qq.com/cgi-bin/gettoken?"
         url  =  token_url  +  url_parameters
         response  =  urllib2.urlopen(url)
         result  =  response.read()
         token_json  =  json.loads(result)
         if  token_json[ 'access_token' is  not  None :
             get_time_now  =  datetime.datetime.now()
             # TODO(Guodong Ding) token will expired ahead of time or not expired after the time
             expire_time  =  get_time_now  +  datetime.timedelta(seconds = token_json[ 'expires_in' ])
             token_json[ 'expires_on' =  str (expire_time)
             self .__access_token  =  token_json[ 'access_token' ]
             self .__expires_in  =  token_json[ 'expires_in' ]
             self .__expires_on  =  token_json[ 'expires_on' ]
             self .__is_expired  =  1
 
             try :
                 token_result_set  =  sqlite3_get_token()
             except  sqlite3.Error:
                 token_result_set  =  None
             if  token_result_set  is  None  and  len (token_result_set)  = =  0 :
                 sqlite3_set_token( self .__access_token,  self .__expires_in,  self .__expires_on,  self .__is_expired)
             else :
                 if  self .__is_token_expired()  is  True :
                     sqlite3_update_token( self .__access_token,  self .__expires_on)
                 else :
                     debug( "pass" )
                     return
         else :
             if  token_json[ 'errcode' is  not  None :
                 print  "errcode is: %s"  %  token_json[ 'errcode' ]
                 print  "errmsg is: %s"  %  token_json[ 'errmsg' ]
             else :
                 print  result
 
     def  __get_token_from_persistence_storage( self ):
         try :
             token_result_set  =  sqlite3_get_token()
         except  sqlite3.Error:
             self .__get_token_from_weixin_qy_api()
         finally :
             if  token_result_set  is  None :
                 self .__get_token_from_weixin_qy_api()
                 token_result_set  =  sqlite3_get_token()
                 access_token  =  token_result_set[ 0 ][ 0 ]
                 expire_time  =  token_result_set[ 0 ][ 1 ]
             else :
                 access_token  =  token_result_set[ 0 ][ 0 ]
                 expire_time  =  token_result_set[ 0 ][ 1 ]
         expire_time  =  datetime.datetime.strptime(expire_time,  '%Y-%m-%d %H:%M:%S.%f' )
         now_time  =  datetime.datetime.now()
         if  now_time < expire_time:
             # print "The token is %s" % access_token
             # print "The token will expire on %s" % expire_time
             return  access_token
         else :
             self .__get_token_from_weixin_qy_api()
             return  self .__get_token_from_persistence_storage()
 
     @ staticmethod
     def  __is_token_expired():
         try :
             token_result_set  =  sqlite3_get_token()
         except  sqlite3.Error as e:
             print  e
             sys.exit( 1 )
         expire_time  =  token_result_set[ 0 ][ 1 ]
         expire_time  =  datetime.datetime.strptime(expire_time,  '%Y-%m-%d %H:%M:%S.%f' )
         now_time  =  datetime.datetime.now()
         if  now_time < expire_time:
             return  False
         else :
             return  True
 
     def  get( self ):
         return  self .__get_token_from_persistence_storage()

tag:微信公众号,python,sqlite3

--end--







本文转自 urey_pp 51CTO博客,原文链接:http://blog.51cto.com/dgd2010/1837284,如需转载请自行联系原作者


相关文章
|
4月前
|
API Python
python 详细理解 import ---- openstack自动import class 到 特定命名空间
python 详细理解 import ---- openstack自动import class 到 特定命名空间
46 0
|
1月前
|
数据采集 测试技术 API
python爬虫之app爬取-微信朋友圈
搭建appium环境,appium基本使用,API操作等等
77 0
|
8天前
|
索引 Python
python 格式化、set类型和class类基础知识练习(上)
python 格式化、set类型和class类基础知识练习
30 0
|
30天前
|
Python
Python类(class)中self的理解
Python类(class)中self的理解
18 0
|
1月前
|
数据采集 存储 关系型数据库
Python爬虫-使用代理获取微信公众号文章
使用代理爬取微信公众号文章
52 0
|
1月前
|
存储 设计模式 Python
Python中的类(Class)和对象(Object)
Python中的类(Class)和对象(Object)
27 0
|
1月前
|
监控 安全 API
怎么用Python找回微信撤回信息
怎么用Python找回微信撤回信息
33 0
|
3月前
|
小程序 关系型数据库 MySQL
基于Python和mysql开发的看图猜成语微信小程序(源码+数据库+程序配置说明书+程序使用说明书)
基于Python和mysql开发的看图猜成语微信小程序(源码+数据库+程序配置说明书+程序使用说明书)
|
3月前
|
小程序 关系型数据库 MySQL
基于Python和mysql开发的今天吃什么微信小程序(源码+数据库+程序配置说明书+程序使用说明书)
基于Python和mysql开发的今天吃什么微信小程序(源码+数据库+程序配置说明书+程序使用说明书)
|
4月前
|
机器人 API 持续交付
不懂浪漫?用30行Python代码实现自动给心上人发送微信消息
不懂浪漫?用30行Python代码实现自动给心上人发送微信消息