Redis源码分析

  1. 云栖社区>
  2. 博客>
  3. 正文

Redis源码分析

feilengcui008 2016-05-29 22:20:51 浏览722
展开阅读全文
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/feilengcui008/article/details/51534406

这一篇或者说一个系列用来记录Redis相关的一些源码分析,不定时更新。

目前已添加的内容:

  • Redis之eventloop
  • Redis数据结构之dict

Redis之eventloop

简介

Redis的eventloop实现也是比较平常的,主要关注文件描述符和timer相关事件,而且timer只是简单用一个单链表(O(n)遍历寻找最近触发的时间)实现。

流程

  • 主要在initServer(server.c)中初始化整个eventloop相关的数据结构与回调
// 注册系统timer事件
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
  serverPanic("Can't create event loop timers.");
  exit(1);
}

// 注册poll fd的接收客户端连接的读事件
for (j = 0; j < server.ipfd_count; j++) {
  if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
        acceptTcpHandler,NULL) == AE_ERR)
  {
    serverPanic(
        "Unrecoverable error creating server.ipfd file event.");
  }
}
// 同上
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
      acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
  • acceptTcpHandler处理客户端请求,分配client结构,注册事件
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
acceptCommonHandler(cfd,0,cip);

  • createClient,创建客户端
// receieved a client, alloc client structure 
// and register it into eventpoll
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
if (fd != -1) {
  anetNonBlock(NULL,fd);
  anetEnableTcpNoDelay(NULL,fd);
  if (server.tcpkeepalive)
    anetKeepAlive(NULL,fd,server.tcpkeepalive);
  // register read event for client connection
  // the callback handler is readQueryFromClient
  // read into client data buffer
  if (aeCreateFileEvent(server.el,fd,AE_READABLE,
        readQueryFromClient, c) == AE_ERR)
  {
    close(fd);
    zfree(c);
    return NULL;
  }
}
  • client读事件触发,读到buffer,解析client命令
dQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) 
--> processInputBuffer 

// handle query buffer
// in processInputBuffer(c);
if (c->reqtype == PROTO_REQ_INLINE) {
    if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
    if (processMultibulkBuffer(c) != C_OK) break;
} else {
    serverPanic("Unknown request type");
}

/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
    resetClient(c);
} else {
    /* Only reset the client when the command was executed. */
    // handle the client command 
    if (processCommand(c) == C_OK)
        resetClient(c);
    /* freeMemoryIfNeeded may flush slave output buffers. This may result
     * into a slave, that may be the active client, to be freed. */
    if (server.current_client == NULL) break;
}
  • 处理客户端命令
// in processCommand 
/* Exec the command */
if (c->flags & CLIENT_MULTI &&
    c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
    c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
    queueMultiCommand(c);
    addReply(c,shared.queued);
} else {
    // call the cmd 
    // 进入具体数据结构的命令处理
    call(c,CMD_CALL_FULL);
    c->woff = server.master_repl_offset;
    if (listLength(server.ready_keys))
        handleClientsBlockedOnLists();
}

其他注意点

  • 关于timer的实现没有采用优先级队列(O(logn))等其他数据结构,而是直接采用O(n)遍历的单链表,是因为一般来说timer会较少?

Redis数据结构之dict

主要特点

Redis的hashtable实现叫dict,其实现和平常没有太大的区别,唯一比较特殊的地方是每个dict结构内部有两个实际的hashtable结构dictht,是为了实现增量哈希,故名思义,即当第一个dictht到一定负载因子后会触发rehash,分配新的dictht结构的动作和真正的rehash的动作是分离的,并且rehash被均摊到各个具体的操作中去了,这样就不会长时间阻塞线程,因为Redis是单线程。另外,增量hash可以按多步或者持续一定时间做。

主要数据结构

  • dictEntry => hashtable的bucket
  • dictType => 规定操作hashtable的接口
  • dictht => hashtable
  • dict => 对外呈现的”hashtable”
  • dictIterator => 迭代器,方便遍历
// dict.h
// hash table entry 
typedef struct dictEntry {
    void *key;  // key 
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;  // value
    struct dictEntry *next;  // linked list 
} dictEntry;

// operations(APIS) of some type of hashtable
typedef struct dictType {
    // hash function 
    unsigned int (*hashFunction)(const void *key);
    // copy key 
    void *(*keyDup)(void *privdata, const void *key);
    // copy value 
    void *(*valDup)(void *privdata, const void *obj);
    // key comparison 
    int (*keyCompare)(void *privdata, const void *key1, const void *key2);
    // dtor for key 
    void (*keyDestructor)(void *privdata, void *key);
    // dtor for value 
    void (*valDestructor)(void *privdata, void *obj);
} dictType;

/* This is our hash table structure. Every dictionary has two of this as we
 * implement incremental rehashing, for the old to the new table. */
// a hashtable 
typedef struct dictht {
    dictEntry **table;  // entries 
    unsigned long size;  // max size 
    unsigned long sizemask;  // mask 
    unsigned long used;  // current used 
} dictht;

typedef struct dict {
    dictType *type;  // type operations 
    void *privdata;  // for extension 
    dictht ht[2];    // two hashtables 
    // rehashing flag
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    // users number 
    unsigned long iterators; /* number of iterators currently running */
} dict;

/* If safe is set to 1 this is a safe iterator, that means, you can call
 * dictAdd, dictFind, and other functions against the dictionary even while
 * iterating. Otherwise it is a non safe iterator, and only dictNext()
 * should be called while iterating. */
typedef struct dictIterator {
    dict *d;
    long index;
    int table, safe;
    dictEntry *entry, *nextEntry;
    /* unsafe iterator fingerprint for misuse detection. */
    long long fingerprint;
} dictIterator;

主要接口

// dict.h
// create
dict *dictCreate(dictType *type, void *privDataPtr);

// expand or initilize the just created dict, alloc second hashtable of dict for incremental rehashing
int dictExpand(dict *d, unsigned long size);

// add, if in rehashing, do 1 step of incremental rehashing
int dictAdd(dict *d, void *key, void *val);
dictEntry *dictAddRaw(dict *d, void *key);

// update, if in rehashing, do 1 step of incremental rehashing
// can we first find and return the entry no matter it is update or add, so 
// we can speed up the update process because no need to do twice find process?
int dictReplace(dict *d, void *key, void *val);
dictEntry *dictReplaceRaw(dict *d, void *key);

// delete if in rehashing, do 1 step of incremental rehashing
int dictDelete(dict *d, const void *key);  // free the memory 
int dictDeleteNoFree(dict *d, const void *key);  // not free the memory

// can we use a double linked list to free the hash table, so speed up?
void dictRelease(dict *d);

// find an entry
dictEntry * dictFind(dict *d, const void *key);
void *dictFetchValue(dict *d, const void *key);

// resize to eh pow of 2 number just >= the used number of slots
int dictResize(dict *d);

// alloc a new iterator
dictIterator *dictGetIterator(dict *d);
// alloc a safe iterator 
dictIterator *dictGetSafeIterator(dict *d);
// next entry 
dictEntry *dictNext(dictIterator *iter);
void dictReleaseIterator(dictIterator *iter);

// random sampling
dictEntry *dictGetRandomKey(dict *d);
unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count);

// get stats info
void dictGetStats(char *buf, size_t bufsize, dict *d);

// murmurhash 
unsigned int dictGenHashFunction(const void *key, int len);
unsigned int dictGenCaseHashFunction(const unsigned char *buf, int len);

// empty a dict 
void dictEmpty(dict *d, void(callback)(void*));

void dictEnableResize(void);
void dictDisableResize(void);

// do n steps rehashing
int dictRehash(dict *d, int n);
// do rehashing for a ms milliseconds
int dictRehashMilliseconds(dict *d, int ms);

// hash function seed 
void dictSetHashFunctionSeed(unsigned int initval);
unsigned int dictGetHashFunctionSeed(void);

// scan a dict
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata);

一些可能优化的地方

  • 在dictReplace中能否统一add和update的查找,无论是add还是update都返回一个entry,用标识表明是add还是update,而不用在update时做两次查找,从而提升update的性能

  • 在release整个dict时,是循环遍历所有头bucket,最坏情况接近O(n),能否用双向的空闲链表优化(当然这样会浪费指针所占空间)

网友评论

登录后评论
0/500
评论
feilengcui008
+ 关注