
来源:阿里云开发者
阿里妹导读
简介
原理
1.Redis内核会暴露出/导出很多API给module使用(如内存分配接口、redis核心db结构的操作接口),注意这些API是redis自己解析绑定的,而不是靠动态连接器解析的。
加载
int moduleLoad(const char *path, void **module_argv, int module_argc, int is_loadex) {int (*onload)(void *, void **, int);void *handle;struct stat st;if (stat(path, &st) == 0) {/* This check is best effort */if (!(st.st_mode & (S_IXUSR | S_IXGRP | S_IXOTH))) {serverLog(LL_WARNING, "Module %s failed to load: It does not have execute permissions.", path);return C_ERR;}}// 打开module sohandle = dlopen(path,RTLD_NOW|RTLD_LOCAL);if (handle == NULL) {serverLog(LL_WARNING, "Module %s failed to load: %s", path, dlerror());return C_ERR;}// 获取module中的onload函数符号地址onload = (int (*)(void *, void **, int))(unsigned long) dlsym(handle,"RedisModule_OnLoad");if (onload == NULL) {dlclose(handle);serverLog(LL_WARNING,"Module %s does not export RedisModule_OnLoad() ""symbol. Module not loaded.",path);return C_ERR;}RedisModuleCtx ctx;moduleCreateContext(&ctx, NULL, REDISMODULE_CTX_TEMP_CLIENT); /* We pass NULL since we don't have a module yet. */// 调用onload对module进行初始化if (onload((void*)&ctx,module_argv,module_argc) == REDISMODULE_ERR) {serverLog(LL_WARNING,"Module %s initialization failed. Module not loaded",path);if (ctx.module) {moduleUnregisterCommands(ctx.module);moduleUnregisterSharedAPI(ctx.module);moduleUnregisterUsedAPI(ctx.module);moduleRemoveConfigs(ctx.module);moduleFreeModuleStructure(ctx.module);}moduleFreeContext(&ctx);dlclose(handle);return C_ERR;}/* Redis module loaded! Register it. *///... 无关代码省略 ...moduleFreeContext(&ctx);return C_OK;}
API 绑定
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {if (RedisModule_Init(ctx, "helloworld", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR)return REDISMODULE_ERR;// ... 无关代码省略 ...}
static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) {void *getapifuncptr = ((void**)ctx)[0];RedisModule_GetApi = (int (*)(const char *, void *)) (unsigned long)getapifuncptr;// 绑定redis导出的apiREDISMODULE_GET_API(Alloc);REDISMODULE_GET_API(TryAlloc);REDISMODULE_GET_API(Calloc);REDISMODULE_GET_API(Free);REDISMODULE_GET_API(Realloc);REDISMODULE_GET_API(Strdup);REDISMODULE_GET_API(CreateCommand);REDISMODULE_GET_API(GetCommand);// ... 无关代码省略 ...}
#define REDISMODULE_GET_API(name) \RedisModule_GetApi("RedisModule_" #name, ((void **)&RedisModule_ ## name))void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_flags) {memset(out_ctx, 0 ,sizeof(RedisModuleCtx));// 这里把GetApi地址传递给moduleout_ctx->getapifuncptr = (void*)(unsigned long)&RM_GetApi;out_ctx->module = module;out_ctx->flags = ctx_flags;// ... 无关代码省略 ...}
void *getapifuncptr = ((void**)ctx)[0];RedisModule_GetApi = (int (*)(const char *, void *)) (unsigned long)getapifuncptr;
struct RedisModuleCtx { // getapifuncptr是第一个成员 void *getapifuncptr; /* NOTE: Must be the first field. */ struct RedisModule *module; /* Module reference. */ client *client; /* Client calling a command. */ // ... 无关代码省略 ...};搞清楚了RM_GetApi是怎么被导出的原理后,我们来接着看下RM_GetApi内部在做什么:
int RM_GetApi(const char *funcname, void **targetPtrPtr) { /* Lookup the requested module API and store the function pointer into the * target pointer. The function returns REDISMODULE_ERR if there is no such * named API, otherwise REDISMODULE_OK. * * This function is not meant to be used by modules developer, it is only * used implicitly by including redismodule.h. */ dictEntry *he = dictFind(server.moduleapi, funcname); if (!he) return REDISMODULE_ERR; *targetPtrPtr = dictGetVal(he); return REDISMODULE_OK;}/* Register all the APIs we export. Keep this function at the end of the* file so that's easy to seek it to add new entries. */void moduleRegisterCoreAPI(void) {server.moduleapi = dictCreate(&moduleAPIDictType);server.sharedapi = dictCreate(&moduleAPIDictType);// 向全局哈希表中注册函数REGISTER_API(Alloc);REGISTER_API(TryAlloc);REGISTER_API(Calloc);REGISTER_API(Realloc);REGISTER_API(Free);REGISTER_API(Strdup);REGISTER_API(CreateCommand);// ... 无关代码省略 ...}
int moduleRegisterApi(const char *funcname, void *funcptr) {return dictAdd(server.moduleapi, (char*)funcname, funcptr);}moduleRegisterApi("RedisModule_"
一些最佳实践
入口函数禁用c++ mangle
extern "C" __attribute__((visibility("default"))) int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {// Init code and command registerreturn REDISMODULE_OK;}
接管内存统计
REDISMODULE_API void * (*RedisModule_Alloc)(size_t bytes) REDISMODULE_ATTR;REDISMODULE_API void * (*RedisModule_Realloc)(void *ptr, size_t bytes) REDISMODULE_ATTR;REDISMODULE_API void (*RedisModule_Free)(void *ptr) REDISMODULE_ATTR;REDISMODULE_API void * (*RedisModule_Calloc)(size_t nmemb, size_t size) REDISMODULE_ATTR;
new/operator new/placement new
1.分配空间(使用operator new)
2.初始化对象(使用placement new或者类型强转),即调用对象的构造函数
void * operator new(size_t, void *location) { return location; }可见,要想实现修改new默认使用的内存分配,我们可以使用两种方式。
placement new
Object *p=(Object*)RedisModule_Alloc(sizeof(Object));new (p)Object();
同时注意析构时也需要特殊处理:
p->~Object();RedisModule_Free(p);
operator new
_GLIBCXX_WEAK_DEFINITION void *operator new (std::size_t sz) _GLIBCXX_THROW (std::bad_alloc){void *p;/* malloc (0) is unpredictable; avoid it. */if (sz == 0)sz = 1;while (__builtin_expect ((p = malloc (sz)) == 0, false)){new_handler handler = std::get_new_handler ();if (! handler)_GLIBCXX_THROW_OR_ABORT(bad_alloc());handler ();}return p;}
void *operator new(std::size_t size) { return RedisModule_Alloc(size); }void operator delete(void *ptr) noexcept { RedisModule_Free(ptr); }operator new在多个module之间的可见性
静态链接/动态链接c++标准库
静态链接
动态链接
使用block机制提高并发处理能力

图1 典型的异步处理模型
block虽然看上去很美好很强大,但是需要小心处理一些坑,如:
命令虽然异步执行了,但是写AOF和向备库复制依然同步做。如果提前写AOF并向备库复制,万一后面命令执行失败了就无法回滚; 因为备库是不允许执行block命令的,因此主库需要将block类型的命令rewrite成非block类型的命令复制给备库; 异步执行时,在open一个key时不能只看keyname,因为可能在异步线程执行之前,原来的key已经被删除了,然后又有一个同名的key被创建,即当前看到的key已经不是原来的key了; 设计好block类型的命令是否支持事务和lua; 如果采用线程池,需要注意相同key在线程池中的保序执行问题(即相同key的处理不能乱序);
避免和其他Module符号冲突
因为redis可以同时加载多个module,这些module可能来自不同的团队和个人,因此存在一定的概率,不同的module会定义相同的函数名。为了避免符号冲突导致的未定义行为,建议每个module都把除了Onload和Unload函数之外的符号都隐藏掉,可以在给编译器传递一些flag实现。如gcc:
-fvisibility=hidden
小心Fork陷阱
处理inflight状态的命令
如果module采用异步执行模型(参看前文block一节),那么当redis做aofrewrite或bgsave时,在redis fork子进程的瞬间,如果还有一些命令处于inflight状态,那么此时新产生的base aof或者rdb可能并不会包含这些inflight时的数据,虽然这个看上去也没有太大问题,因为inflight的命令最终完成时也会把命令写入增量的aof中。但是,为了和redis原来的行为兼容(即fork时一定没有处于inflight状态的命令,是一个静止的状态),module最好还是保证所有的inflight状态的命令都执行完了再执行fork。
在module中可以通过redis暴露的RedisModuleEvent_ForkChild事件,在fork执行之前执行一个我们传入的回调函数。
RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_ForkChild, waitAllInflightTaskFinish);
比如在waitAllInflightTaskFinish中等待队列为空(即所有task都执行结束):
static void waitAllInflightTaskFinish() { while (!thread_pool->idle()) ;}int pthread_atfork(void (*prepare)(void), void (*parent)void(), void (*child)(void));
避免死锁
我们知道通过fork创建的一个子进程几乎但不完全与父进程相同。子进程得到与父进程用户级虚拟地址空间相同的(但是独立的)一份拷贝,包括文本、数据和bss段、堆以及用户栈等。子进程还获得与父进程任何打开文件描述符相同的拷贝,这就意味着子进程可以读写父进程中任何打开的文件,父进程和子进程之间最大的区别在于它们有着不同的PID。
但是有一点需要注意的是,在Linux中,fork的时候只复制当前线程到子进程,在fork(2)-Linux Man Page中有着这样一段相关的描述:
The child process is created with a single thread--the one that called fork(). The entire virtual address space of the parent is replicated in the child, including the states of mutexes, condition variables, and other pthreads objects; the use of pthread_atfork(3) may be helpful for dealing with problems that this can cause.
也就是说除了调用fork的线程外,其他线程在子进程中“蒸发”了。
因此,如果在一些异步线程中持有了一些资源的锁,那么在子进程中,因为这些线程消失了,那么子进程可能会发生死锁的问题。
解决方法和解决inflight一样,保证在fork之前所有的锁都释放掉即可。(其实只要所有inflight状态的命令都执行完了,一般锁也就都释放了)
确保向备库复制的AOF保持语义幂等
Redis的主备复制首要目标就是保证主备的一致性。因此备库要做的就是无条件接收来自主库的复制内容,并严格保持一致。但是对于一些比较特殊的命令而言,需要小心处理。
以Tair暴露的Tair String为例,支持给数据设置版本号,比如用户写入:
EXSET key value VER 10
EXSET key value ABS 11
支持graceful shutdown
Module内部可能会启动一些异步线程或者管理一些异步资源,这些资源需要在redis shutdown时被处理(如停止、析构、写磁盘等),否则redis在退出时可能发生coredump。
在redis中,可以注册RedisModuleEvent_Shutdown事件实现,当redis关机时会回调我们传入的ShutdownCallback。
当然,在较新的redis版本中,module也可以通过暴露unload函数来实现类似的功能。
RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_Shutdown, ShutdownCallback);
实现aof文件压缩功能,如将一个hash的所有写操作重写为一条hmset命令(也可能是多条); 避免重写后的一条aof过大(如超过500MB),如果超过,则需要rewrite成多条cmd,同时需要确保这些多条cmd是否需要以事务的方式执行(即需要操作命令执行的隔离性); 对于一些复杂结构,无法简单重写为已有命令的module,可以单独实现一个“内部”命令,如xxxload/xxxdump等,用于实现对该module数据结构的序列化和反序列化,该命令不会对外暴露给客户端; RedisModule_EmitAOF中如果包含array类型的参数(即使用'v' flag传递的参数),则array的长度一定要使用size_t类型,否则可能会遇到诡异的错误;

RDB编码具有向后兼容能力
RDB是二进制格式的序列化和反序列化,因此相对而言比较简单。但是需要注意的是,如果数据结构以后的序列化方式可能会改变,则最好加上编解码的版本,这样在升级的时候可以保证兼容性,如下:
void *xxx_RdbLoad(RedisModuleIO *rdb, int encver) { if (encver == version1 ) { /* version1 format */ } else if (encver == version2 ){ /* version2 format */ }}一些命令实现的建议
参数检验:尽量在命令开始处对参数合法性(如参数个数是否正确、参数类型是否正确等)进行校验,尽量避免命令没有成功执行的情况下提前污染了keyspace(如提前使用了RedisModule_ModuleTypeSetValue修改主数据库) 错误信息:返回的错误信息应尽可能简单明了,阐明错误类型是什么 响应类型保持统一:注意命令在各种情况下的返回类型要统一,如key不存在、key类型错误、执行成功以及一些参数错误时的响应类型。通常情况下,除了返回错误类型之外,其他的所有情况都应该返回相同类型,如都返回一个简单字符串、或者都返回一个数组(哪怕是一个空数组)。这样客户端在解析命令返回值时比较方便 确认读写类型:命令应严格区分读写类型,这涉及到该命令能否在replica上执行、以及该命令是否需要进行同步、写aof等 复制幂等性和AOF:对于写命令,需要自行使用RedisModule_ReplicateVerbatim或者RedisModule_Replicate进行主备复制和写AOF(必要的时候需要对原命令进程重写)。其中,使用RedisModule_Replicate产生的AOF,前后都会被自动加上multi/exec(保证module内产生的命令具有隔离性)。因此,推荐优先使用RedisModule_ReplicateVerbatim进行复制和写AOF。但是,如果命令中存在诸如版本号等参数,则必须使用RedisModule_Replicate将版本号重写为绝对版本号,将过期时间重写为绝对过期时间。另外,如果一个命令最终RedisModule_Replicate对命令进行重写,则需要保证重写后的命令不会再次发生重写。 复用argv参数:命令传入的argv中的参数类型为RedisModuleString ** ,这些RedisModuleString在命令返回后会被自动Free掉,因此命令中不应该直接引用这些RedisModuleString指针,如果非要这么做(如避免内存拷贝),可以使用RedisModule_RetainString/RedisModule_HoldString增加该RedisModuleString的引用计数,但是之后一定要记得自己手动Free key打开方式:在使用RedisModule_OpenKey打开一个key的时候,要严格区分打开的类型:REDISMODULE_READ、REDISMODULE_WRITE,因为这影响着是否更新内部的stat_keyspace_misses和stat_keyspace_hits信息,还影响的了过期再写入的问题。同时,使用REDISMODULE_READ方式打开的key不能被删除,否则报错 key类型处理:目前只有string的set命令可以强行覆盖其他类型的key,其他的命令在遇到key存在但类型不匹配时需要返回""WRONGTYPE Operation against a key holding the wrong kind of value"错误 多key命令的cluster支持:对于多key的命令,一定要处理好firstkey、lastkey、keystep这三个值,因为只有这三个值对了,在cluster模式下,redis才会去检查这些key是否存在CROSS SLOTS的问题 全局索引、结构:module中如果有自己维护的全局索引,需要谨慎索引中是否包含dbid、key等信息,因为redis的move、rename、swapdb等命令会“偷梁换柱”式的更换key的名字、交换两个dbid,因此此时如果索引没有同步更新,将得到意想不到的错误
根据角色来确定动作:module本身运行的redis可能是一个主也可能是一个备,module内部可以使用RedisModule_GetContextFlags来判断当前redis的角色,并根据不同的角色来采取不同的行为(如是否进行主动过期处理等)
总结
Tair当前支持了非常多的扩展数据结构(其中redis 5.x企业版使用module方式,Tair自研企业版 6.x使用builtin方式),基本涵盖了各种应用场景(具体见介绍文档),其中既有像TairString和TairHash等小而美的数据结构(已经开源),也有像Tair Search和Vector等更为复杂和强大的计算型数据结构,充分满足AIGC背景下各种业务场景,欢迎使用。
介绍文档:https://help.aliyun.com/zh/redis/developer-reference/extended-data-structures-of-apsaradb-for-redis-enhanced-edition
fork(2)-Linux Man Page:http://linux.die.net/man/2/fork