背景
因为更换IDC的原因,我们需要迁移缓存到新的机房,开发同学提出老的缓存有1.2亿无效(未设置过期时间)的key和正常在用的业务key,在迁移之前可以先指定前缀将key删除。那么问题来了,如何快速删除1.2亿的key呢?
如何获取指定的 key
大家都知道由于Redis的单线程服务模式,命令 keys * 会阻塞正常的业务请求,所以肯定不行。
在这里我们利用Redis 提供的 SCAN 功能。SCAN 命令是一个基于游标的迭代器(cursor based iterator): SCAN 命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为 SCAN 命令的游标参数, 以此来延续之前的迭代过程。
当 SCAN 命令的游标参数被设置为 0 时, 服务器将开始一次新的迭代, 而当服务器向用户返回值为 0 的游标时, 表示迭代已结束。 SCAN的语法如下
SCAN cursor [MATCH pattern] [COUNT count]
其中 cousor 是游标,MATCH 则支持正则匹配,我们正好可以利用此功能,比如匹配 前缀为"dba_"的key, COUNT 是每次获取多少个key。
redis 127.0.0.1:6379> scan 0 1) "17" 2) 1) "key:12" 2) "key:8" 3) "key:4" 4) "key:14" 5) "key:16" 6) "key:17" 7) "key:15" 8) "key:10" 9) "key:3" 10) "key:7" 11) "key:1" redis 127.0.0.1:6379> scan 17 1) "0" 2) 1) "key:5" 2) "key:18" 3) "key:0" 4) "key:2" 5) "key:19" 6) "key:13" 7) "key:6" 8) "key:9" 9) "key:11"
在上面这个例子中, 第一次迭代使用 0 作为游标, 表示开始一次新的迭代。第二次迭代使用的是第一次迭代时返回的游标, 也即是命令回复第一个元素的值 —— 17 。 在第二次调用 SCAN 命令时, 命令返回了游标 0 , 这表示迭代已经结束, 整个数据集(collection)已经被完整遍历过了。
从上面的示例可以看到, SCAN 命令的回复是一个包含两个元素的数组, 第一个数组元素是用于进行下一次迭代的新游标, 而第二个数组元素则是一个数组, 这个数组中包含了所有被迭代的元素。
注意:以 0 作为游标开始一次新的迭代, 一直调用 SCAN 命令, 直到命令返回游标 0 , 我们称这个过程为一次完整遍历(full iteration)。 我们会在后面的代码实现中利用此特点。
Python的redis 模块提供 scan_iter 迭代器来遍历key,其返回的结果迭代器对象。
In [53]: ret=r.scan_iter('dba_*',20) In [54]: print ret
至此,我们解决了如何获取数据的问题,下面思考第二个问题。
如何执行删除
这个相对比较简单,Redis 提供DEL 命令
127.0.0.1:6379[2]> get "dba_7" "r06cVX9" 127.0.0.1:6379[2]> get "dba_1" "ETX57PA" 127.0.0.1:6379[2]> del "dba_7" "dba_1" (integer) 2 127.0.0.1:6379[2]>
在redis-py 中,提供了delete(key),delete(*key)的函数, 其中参数 *key 是多个值的列表。 到这里,我们大致可以想到获取key,然后批量删除
(mytest)? test git:(master) ? python delete_key.py initial keys successfully,use time: 90.2497739792 normal ways end at: 68.685477972 normal ways delete numbers: 1000000
常规方式的删除10W个key耗时68.7秒,如果是1.2亿个key 要多少时间呢?68*1000/3600=18.8小时。能不能更快呢?
如何提高执行速度
Redis本身是基于Request/Response协议的,客户端发送一个命令,等待Redis应答,Redis在接收到命令,处理后应答。其中发送命令加上返回结果的时间称为(Round Time Trip)RRT-往返时间。如果客户端发送大量的命令给Redis,那就是等待上一条命令应答后再执行再执行下一条命令,这中间不仅仅多了RTT,而且还频繁的调用系统IO,发送网络请求。
Pipeline(流水线)功能极大的改善了上面的缺点。Pipeline能将一组Redis命令进行组装,然后一次性传输给Redis,再将Redis执行这组命令的结果按照顺序返回给客户端。
需要注意的是Pipeline 虽然好用,但是Pipline组装的命令个数不能没有限制,否则一次组装数据量过大,一方面增加客户端的等待时间,另一方面会造成网络阻塞,需要批量组装。使用Pepline 和常规方式的性能对比如下:
代码
-
# encoding: utf-8
-
"""
-
author: yangyi@youzan.com
-
time: 2018/3/9 下午8:35
-
func:
-
"""
-
import redis
-
import random
-
import string
-
import time
-
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=2)
-
r = redis.Redis(connection_pool=pool)
-
-
-
def random_str():
-
return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(7))
-
-
-
def init_keys():
-
start_time = time.time()
-
for i in xrange(0, 20):
-
key_name = 'dba_'+str(i)
-
value_name = random_str()
-
r.set(key_name, value_name)
-
print 'initial keys successfully,use time:', time.time() - start_time
-
-
-
def del_keys_without_pipe():
-
start_time = time.time()
-
result_length = 0
-
for key in r.scan_iter(match='dba_*', count=2000):
-
r.delete(key)
-
result_length += 1
-
print "normal ways end at:", time.time() - start_time
-
print "normal ways delete numbers:", result_length
-
-
-
def del_keys_with_pipe():
-
start_time = time.time()
-
result_length = 0
-
pipe = r.pipeline()
-
for key in r.scan_iter(match='dba_*', count=5000):
-
pipe.delete(key)
-
result_length += 1
-
if result_length % 5000 == 0:
-
pipe.execute()
-
pip_time = time.time()
-
print "use pipeline scan time ", time.time() - start_time
-
pipe.execute()
-
-
print "use pipeline end at:", time.time() - pip_time
-
print "use pipeline ways delete numbers:", result_length
-
-
-
def main():
-
init_keys()
-
del_keys_without_pipe()
-
init_keys()
-
del_keys_with_pipe()
-
-
-
if __name__ == '__main__':
-
main()
-
# encoding: utf-8
-
"""
-
author: yangyi@youzan.com
-
time: 2018/3/9 下午8:35
-
func:
-
"""
-
import redis
-
import random
-
import string
-
import time
-
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=2)
-
r = redis.Redis(connection_pool=pool)
-
-
-
def random_str():
-
return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(7))
-
-
-
def init_keys():
-
start_time = time.time()
-
for i in xrange(0, 20):
-
key_name = 'dba_'+str(i)
-
value_name = random_str()
-
r.set(key_name, value_name)
-
print 'initial keys successfully,use time:', time.time() - start_time
-
-
-
def del_keys_without_pipe():
-
start_time = time.time()
-
result_length = 0
-
for key in r.scan_iter(match='dba_*', count=2000):
-
r.delete(key)
-
result_length += 1
-
print "normal ways end at:", time.time() - start_time
-
print "normal ways delete numbers:", result_length
-
-
-
def del_keys_with_pipe():
-
start_time = time.time()
-
result_length = 0
-
pipe = r.pipeline()
-
for key in r.scan_iter(match='dba_*', count=5000):
-
pipe.delete(key)
-
result_length += 1
-
if result_length % 5000 == 0:
-
pipe.execute()
-
pip_time = time.time()
-
print "use pipeline scan time ", time.time() - start_time
-
pipe.execute()
-
-
print "use pipeline end at:", time.time() - pip_time
-
print "use pipeline ways delete numbers:", result_length
-
-
-
def main():
-
init_keys()
-
del_keys_without_pipe()
-
init_keys()
-
del_keys_with_pipe()
-
-
-
if __name__ == '__main__':
- main()
如果读者朋友有其他更好的方式的,欢迎交流。
资料
https://pypi.python.org/pypi/redis
本文同步发布于个人公众号 ,欢迎关注 ,交流技术。