今天在写zabbix storm job监控脚本的时候用到了python的redis模块,之前也有用过,但是没有过多的了解,今天看了下相关的api和源码,看到有ConnectionPool的实现,这里简单说下。
在ConnectionPool之前,如果需要连接redis,我都是用StrictRedis这个类,在源码中可以看到这个类的具体解释:
1
2
|
redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an
implementation of the Redis protocol.Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server
|
使用的方法:
1
2
|
r
=
redis.StrictRedis(host
=
xxxx, port
=
xxxx, db
=
xxxx)
r.xxxx()
|
有了ConnectionPool这个类之后,可以使用如下方法
1
2
|
pool
=
redis.ConnectionPool(host
=
xxx, port
=
xxx, db
=
xxxx)
r
=
redis.Redis(connection_pool
=
pool)
|
这里Redis是StrictRedis的子类
简单分析如下:
在StrictRedis类的__init__方法中,可以初始化connection_pool这个参数,其对应的是一个ConnectionPool的对象:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
class
StrictRedis(
object
):
........
def
__init__(
self
, host
=
'localhost'
, port
=
6379
,
db
=
0
, password
=
None
, socket_timeout
=
None
,
socket_connect_timeout
=
None
,
socket_keepalive
=
None
, socket_keepalive_options
=
None
,
connection_pool
=
None
, unix_socket_path
=
None
,
encoding
=
'utf-8'
, encoding_errors
=
'strict'
,
charset
=
None
, errors
=
None
,
decode_responses
=
False
, retry_on_timeout
=
False
,
ssl
=
False
, ssl_keyfile
=
None
, ssl_certfile
=
None
,
ssl_cert_reqs
=
None
, ssl_ca_certs
=
None
):
if
not
connection_pool:
..........
connection_pool
=
ConnectionPool(
*
*
kwargs)
self
.connection_pool
=
connection_pool
|
在StrictRedis的实例执行具体的命令时会调用execute_command方法,这里可以看到具体实现是从连接池中获取一个具体的连接,然后执行命令,完成后释放连接:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# COMMAND EXECUTION AND PROTOCOL PARSING
def
execute_command(
self
,
*
args,
*
*
options):
"Execute a command and return a parsed response"
pool
=
self
.connection_pool
command_name
=
args[
0
]
connection
=
pool.get_connection(command_name,
*
*
options)
#调用ConnectionPool.get_connection方法获取一个连接
try
:
connection.send_command(
*
args)
#命令执行,这里为Connection.send_command
return
self
.parse_response(connection, command_name,
*
*
options)
except
(ConnectionError, TimeoutError) as e:
connection.disconnect()
if
not
connection.retry_on_timeout
and
isinstance
(e, TimeoutError):
raise
connection.send_command(
*
args)
return
self
.parse_response(connection, command_name,
*
*
options)
finally
:
pool.release(connection)
#调用ConnectionPool.release释放连接
|
在来看看ConnectionPool类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
class
ConnectionPool(
object
):
...........
def
__init__(
self
, connection_class
=
Connection, max_connections
=
None
,
*
*
connection_kwargs):
#类初始化时调用构造函数
max_connections
=
max_connections
or
2
*
*
31
if
not
isinstance
(max_connections, (
int
,
long
))
or
max_connections <
0
:
#判断输入的max_connections是否合法
raise
ValueError(
'"max_connections" must be a positive integer'
)
self
.connection_class
=
connection_class
#设置对应的参数
self
.connection_kwargs
=
connection_kwargs
self
.max_connections
=
max_connections
self
.reset()
#初始化ConnectionPool 时的reset操作
def
reset(
self
):
self
.pid
=
os.getpid()
self
._created_connections
=
0
#已经创建的连接的计数器
self
._available_connections
=
[]
#声明一个空的数组,用来存放可用的连接
self
._in_use_connections
=
set
()
#声明一个空的集合,用来存放已经在用的连接
self
._check_lock
=
threading.Lock()
.......
def
get_connection(
self
, command_name,
*
keys,
*
*
options):
#在连接池中获取连接的方法
"Get a connection from the pool"
self
._checkpid()
try
:
connection
=
self
._available_connections.pop()
#获取并删除代表连接的元素,在第一次获取connectiong时,因为_available_connections是一个空的数组,
会直接调用make_connection方法
except
IndexError:
connection
=
self
.make_connection()
self
._in_use_connections.add(connection)
#向代表正在使用的连接的集合中添加元素
return
connection
def
make_connection(
self
):
#在_available_connections数组为空时获取连接调用的方法
"Create a new connection"
if
self
._created_connections >
=
self
.max_connections:
#判断创建的连接是否已经达到最大限制,max_connections可以通过参数初始化
raise
ConnectionError(
"Too many connections"
)
self
._created_connections
+
=
1
#把代表已经创建的连接的数值+1
return
self
.connection_class(
*
*
self
.connection_kwargs)
#返回有效的连接,默认为Connection(**self.connection_kwargs)
def
release(
self
, connection):
#释放连接,链接并没有断开,只是存在链接池中
"Releases the connection back to the pool"
self
._checkpid()
if
connection.pid !
=
self
.pid:
return
self
._in_use_connections.remove(connection)
#从集合中删除元素
self
._available_connections.append(connection)
#并添加到_available_connections 的数组中
def
disconnect(
self
):
#断开所有连接池中的链接
"Disconnects all connections in the pool"
all_conns
=
chain(
self
._available_connections,
self
._in_use_connections)
for
connection
in
all_conns:
connection.disconnect()
|
execute_command最终调用的是Connection.send_command方法,关闭链接为 Connection.disconnect方法,而Connection类的实现:
1
2
3
4
5
6
7
|
class
Connection(
object
):
"Manages TCP communication to and from a Redis server"
def
__del__(
self
):
#对象删除时的操作,调用disconnect释放连接
try
:
self
.disconnect()
except
Exception:
pass
|
核心的链接建立方法是通过socket模块实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
def
_connect(
self
):
err
=
None
for
res
in
socket.getaddrinfo(
self
.host,
self
.port,
0
,
socket.SOCK_STREAM):
family, socktype, proto, canonname, socket_address
=
res
sock
=
None
try
:
sock
=
socket.socket(family, socktype, proto)
# TCP_NODELAY
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY,
1
)
# TCP_KEEPALIVE
if
self
.socket_keepalive:
#构造函数中默认 socket_keepalive=False,因此这里默认为短连接
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE,
1
)
for
k, v
in
iteritems(
self
.socket_keepalive_options):
sock.setsockopt(socket.SOL_TCP, k, v)
# set the socket_connect_timeout before we connect
sock.settimeout(
self
.socket_connect_timeout)
#构造函数中默认socket_connect_timeout=None,即连接为blocking的模式
# connect
sock.connect(socket_address)
# set the socket_timeout now that we're connected
sock.settimeout(
self
.socket_timeout)
#构造函数中默认socket_timeout=None
return
sock
except
socket.error as _:
err
=
_
if
sock
is
not
None
:
sock.close()
.....
|
关闭链接的方法:
1
2
3
4
5
6
7
8
9
10
11
|
def
disconnect(
self
):
"Disconnects from the Redis server"
self
._parser.on_disconnect()
if
self
._sock
is
None
:
return
try
:
self
._sock.shutdown(socket.SHUT_RDWR)
#先shutdown再close
self
._sock.close()
except
socket.error:
pass
self
._sock
=
None
|
可以小结如下
1)默认情况下每创建一个Redis实例都会构造出一个ConnectionPool实例,每一次访问redis都会从这个连接池得到一个连接,操作完成后会把该连接放回连接池(连接并没有释放),可以构造一个统一的ConnectionPool,在创建Redis实例时,可以将该ConnectionPool传入,那么后续的操作会从给定的ConnectionPool获得连接,不会再重复创建ConnectionPool。
2)默认情况下没有设置keepalive和timeout,建立的连接是blocking模式的短连接。
3)不考虑底层tcp的情况下,连接池中的连接会在ConnectionPool.disconnect中统一销毁。
本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1583541,如需转载请自行联系原作者