序列化和写盘
Tokudb数据节点写盘主要是由后台线程异步完成的:
- checkpoint线程:把cachetable(innodb术语buffer pool)中所有脏页写回
- evictor线程:释放内存,如果victim节点是dirty的,需要先将数据写回。
数据在磁盘上是序列化过的,序列化的过程就是把一个数据结构转换成字节流。
写数据包括两个阶段:
- 序列化:把结构化数据转成字节流
- 压缩:对序列化好的数据进行压缩
tokudb序列化和压缩单位是partition,对于internal节点,就是把msg buffer序列化并压缩;对于leaf节点,就是把basement node序列化并压缩。
一个节点(node)在磁盘上是如何存储的呢? 节点数据在写盘时会被写到某个offset开始的位置,这个offset是从blocktable里面分配的一个空闲的空间。我们后面会专门写一篇有关btt(Block Translation Table)和block table的文章。 一个node的数据包含:header,pivot key和partition三部分:
- header:节点meta信息
- pivot key:记录了每个partition的key区间
- partition:排序数据;一个node如果包含多个partition,这些partition是依次顺序存放的
有趣的是,压缩算法的信息是存放在partition压缩buffer的第一个字节。所以,tokudb支持FT索引内部同时使用多种压缩算法。
反序列化和读盘
Tokudb读盘的过程是在cachetable里通过调用get_and_pin系列函数实现
- 前景线程调用get_and_pin系列函数
- cleaner线程调用bring_node_fully_into_memory,这个函数调用pf_callback把不在内存中的那些partition读到内存。
数据从磁盘读到内存之前需要进行解压缩,然后对解压缩好的buffer进行反序列化,转换成内存数据结构。反序列化是使用序列化相反的方法把数据解析出来。
前面提过序列化和压缩的单位是partition,反序列化和解压缩的单位也是partition。
酱,节点数据就可以被FT层访问了。
序列化和压缩过程详解
这里顺便提一下BTT (Block Translation Table),这个表记录了节点(blocknum)在FT文件存储位置(offset)的映射关系。
为什么要引入这个表?Tokudb刷脏时,数据被写到一个新的空闲位置,避免了in-place update,简化recovery过程。
toku_ftnode_flush_callback是调用get_and_pin系列函数提供的flush_callback回调,checkpoint线程(也包含checkpoint thread pool的线程,在checkpoint过程中帮助前景线程做节点数据的回写)或evictor线程在这个函数里面会调用toku_serialize_ftnode_to做序列化和压缩工作。
toku_serialize_ftnode_to比较简单,首先调用toku_serialize_ftnode_to_memory执行序列化和压缩,然后调用blocktable.realloc_on_disk,为blocknum分配一个新的offset,最后调用pwrite把压缩的buffer写到盘上,回写完成清node->dirty标记。
这里单独说一下toku_serialize_ftnode_to_memory的第6个参数in_parallel,true表示并行处理序列化和压缩过程,false表示串行处理。
toku_ftnode_flush_callback通常是在evictor或者checkpoint线程上下文调用的,不影响前景线程服务客户端,这个参数一般是false,只有在loader场景下是true。
toku_serialize_ftnode_to (int fd, BLOCKNUM blocknum, FTNODE node, FTNODE_DISK_DATA* ndd, bool do_rebalancing, FT ft, bool for_checkpoint) { size_t n_to_write; size_t n_uncompressed_bytes; char *compressed_buf = nullptr; // because toku_serialize_ftnode_to is only called for // in toku_ftnode_flush_callback, we pass false // for in_parallel. The reasoning is that when we write // nodes to disk via toku_ftnode_flush_callback, we // assume that it is being done on a non-critical // background thread (probably for checkpointing), and therefore // should not hog CPU, // // Should the above facts change, we may want to revisit // passing false for in_parallel here // // alternatively, we could have made in_parallel a parameter // for toku_serialize_ftnode_to, but instead we did this. int r = toku_serialize_ftnode_to_memory( node, ndd, ft->h->basementnodesize, ft->h->compression_method, do_rebalancing, toku_drd_unsafe_fetch(&toku_serialize_in_parallel), &n_to_write, &n_uncompressed_bytes, &compressed_buf ); if (r != 0) { return r; } // If the node has never been written, then write the whole buffer, including the zeros invariant(blocknum.b>=0); DISKOFF offset; // Dirties the ft ft->blocktable.realloc_on_disk(blocknum, n_to_write, &offset, ft, fd, for_checkpoint); tokutime_t t0 = toku_time_now(); toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset); tokutime_t t1 = toku_time_now(); tokutime_t io_time = t1 - t0; toku_ft_status_update_flush_reason(node, n_uncompressed_bytes, n_to_write, io_time, for_checkpoint); toku_free(compressed_buf); node->dirty = 0; // See #1957. Must set the node to be clean after serializing it so that it doesn't get written again on the next checkpoint or eviction. return 0; }
AI 代码解读
序列化和压缩过程是在toku_serialize_ftnode_to_memory实现,这个函数比较长,我们分成3段来看。
- partition序列化和压缩
- pivot key序列化和压缩
- header序列化
partition序列化和压缩
toku_serialize_ftnode_to_memory的第5个参数do_rebalancing表示leaf节点在写回之前是否要做rebalance,这个参数是在toku_ftnode_flush_callback指定的,如果写回的是数据节点本身,那么是需要做rebalance的。
toku_serialize_ftnode_to_memory首先确保整个数据节点都在内存中,这么做是因为节点的partition数据是依次顺序存放的;然后根据do_rebalancing决定是否要对leaf节点做rebalance;接着是一大段内存分配:
- sb包含节点partition压缩数据的数组,每个元素包含partition的uncompressed的buffer和compressed的buffer
- ndd是指针数组,记录了每个partition压缩后数据的offset和size
这里有个小的优化,并没有为每个partition申请compressed的buffer,而是申请了一个足够大的buffer,每个partition使用其中的一段。uncompressed的buffer也是一样处理的。
足够大的buffer是什么意思呢?
- uncompressed的buffer:各个partition的size总和。
- compressed的buffer:压缩后的最大可能长度加上8个字节的overhead(每个partition压缩前的size和压缩后的size)
使用不同压缩算法,压缩之后的最大可能长度是不同的。
分配好buffer之后,调用serialize_and_compress_in_parallel或者serialize_and_compress_serially进行序列化和压缩。
int toku_serialize_ftnode_to_memory(FTNODE node, FTNODE_DISK_DATA* ndd, unsigned int basementnodesize, enum toku_compression_method compression_method, bool do_rebalancing, bool in_parallel, // for loader is true, for toku_ftnode_flush_callback, is false /*out*/ size_t *n_bytes_to_write, /*out*/ size_t *n_uncompressed_bytes, /*out*/ char **bytes_to_write) // Effect: Writes out each child to a separate malloc'd buffer, then compresses // all of them, and writes the uncompressed header, to bytes_to_write, // which is malloc'd. // // The resulting buffer is guaranteed to be 512-byte aligned and the total length is a multiple of 512 (so we pad with zeros at the end if needed). // 512-byte padding is for O_DIRECT to work. { toku_ftnode_assert_fully_in_memory(node); if (do_rebalancing && node->height == 0) { toku_ftnode_leaf_rebalance(node, basementnodesize); } const int npartitions = node->n_children; // Each partition represents a compressed sub block // For internal nodes, a sub block is a message buffer // For leaf nodes, a sub block is a basement node toku::scoped_calloc sb_buf(sizeof(struct sub_block) * npartitions); struct sub_block *sb = reinterpret_cast<struct sub_block *>(sb_buf.get()); XREALLOC_N(npartitions, *ndd); // // First, let's serialize and compress the individual sub blocks // // determine how large our serialization and compression buffers need to be. size_t serialize_buf_size = 0, compression_buf_size = 0; for (int i = 0; i < node->n_children; i++) { sb[i].uncompressed_size = serialize_ftnode_partition_size(node, i); sb[i].compressed_size_bound = toku_compress_bound(compression_method, sb[i].uncompressed_size); serialize_buf_size += sb[i].uncompressed_size; compression_buf_size += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size } // give each sub block a base pointer to enough buffer space for serialization and compression toku::scoped_malloc serialize_buf(serialize_buf_size); toku::scoped_malloc compression_buf(compression_buf_size); for (size_t i = 0, uncompressed_offset = 0, compressed_offset = 0; i < (size_t) node->n_children; i++) { sb[i].uncompressed_ptr = reinterpret_cast<char *>(serialize_buf.get()) + uncompressed_offset; sb[i].compressed_ptr = reinterpret_cast<char *>(compression_buf.get()) + compressed_offset; uncompressed_offset += sb[i].uncompressed_size; compressed_offset += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size invariant(uncompressed_offset <= serialize_buf_size); invariant(compressed_offset <= compression_buf_size); } // do the actual serialization now that we have buffer space struct serialize_times st = { 0, 0 }; if (in_parallel) { serialize_and_compress_in_parallel(node, npartitions, compression_method, sb, &st); } else { serialize_and_compress_serially(node, npartitions, compression_method, sb, &st); }
AI 代码解读
serialize_and_compress_serially就是串行调用serialize_and_compress_partition进行序列化和压缩。
static void serialize_and_compress_serially(FTNODE node, int npartitions, enum toku_compression_method compression_method, struct sub_block sb[], struct serialize_times *st) { for (int i = 0; i < npartitions; i++) { serialize_and_compress_partition(node, i, compression_method, &sb[i], st); } }
AI 代码解读
serialize_and_compress_in_parallel使用了threadpool来并行执行序列化和压缩,每个partition由一个专门的线程来处理。当前上下文也可以执行序列化和压缩,所以threadpool只创建了(npartitions-1)个线程。
threadpool线程执行的函数也是serialize_and_compress_partition;threadpool线程和当前上下文之间是使用work进行同步的。
static void * serialize_and_compress_worker(void *arg) { struct workset *ws = (struct workset *) arg; while (1) { struct serialize_compress_work *w = (struct serialize_compress_work *) workset_get(ws); if (w == NULL) break; int i = w->i; serialize_and_compress_partition(w->node, i, w->compression_method, &w->sb[i], &w->st); } workset_release_ref(ws); return arg; } static void serialize_and_compress_in_parallel(FTNODE node, int npartitions, enum toku_compression_method compression_method, struct sub_block sb[], struct serialize_times *st) { if (npartitions == 1) { serialize_and_compress_partition(node, 0, compression_method, &sb[0], st); } else { int T = num_cores; if (T > npartitions) T = npartitions; if (T > 0) T = T - 1; struct workset ws; ZERO_STRUCT(ws); workset_init(&ws); struct serialize_compress_work work[npartitions]; workset_lock(&ws); for (int i = 0; i < npartitions; i++) { work[i] = (struct serialize_compress_work) { .base = , .node = node, .i = i, .compression_method = compression_method, .sb = sb, .st = { .serialize_time = 0, .compress_time = 0} }; workset_put_locked(&ws, &work[i].base); } workset_unlock(&ws); toku_thread_pool_run(ft_pool, 0, &T, serialize_and_compress_worker, &ws); workset_add_ref(&ws, T); serialize_and_compress_worker(&ws); workset_join(&ws); workset_destroy(&ws); // gather up the statistics from each thread's work item for (int i = 0; i < npartitions; i++) { st->serialize_time += work[i].st.serialize_time; st->compress_time += work[i].st.compress_time; } } }
AI 代码解读
pivot key序列化和压缩
回到toku_serialize_ftnode_to_memory,序列化partition之后就是序列化pivot key的过程。 sb_node_info存放pivot key压缩数据的信息:
- uncompressed_ptr和uncompressed_size是未压缩数据的buffer和size
- compressed_ptr和compressed_size_bound是压缩后数据的buffer和压缩后最大可能的size+8个字节的overhead(未压缩数据size和压缩后数据的size)
前面提到,压缩后的size是由压缩算法决定,不同的压缩算法压缩之后最大可能的size是不同的。
toku_serialize_ftnode_to_memory调用serialize_and_compress_sb_node_info把pivot key信息序列化并压缩。
pivot key的compressed buffer头8个字节分别存储pivot key的compressed size和uncompressed size,从第9个字节开始才是压缩的字节流;而checksum是针对整个compressed buffer做的。
// // Now lets create a sub-block that has the common node information, // This does NOT include the header // // determine how large our serialization and copmression buffers need to be struct sub_block sb_node_info; sub_block_init(&sb_node_info); size_t sb_node_info_uncompressed_size = serialize_ftnode_info_size(node); size_t sb_node_info_compressed_size_bound = toku_compress_bound(compression_method, sb_node_info_uncompressed_size); toku::scoped_malloc sb_node_info_uncompressed_buf(sb_node_info_uncompressed_size); toku::scoped_malloc sb_node_info_compressed_buf(sb_node_info_compressed_size_bound + 8); // add 8 extra bytes, 4 for compressed size, 4 for decompressed size sb_node_info.uncompressed_size = sb_node_info_uncompressed_size; sb_node_info.uncompressed_ptr = sb_node_info_uncompressed_buf.get(); sb_node_info.compressed_size_bound = sb_node_info_compressed_size_bound; sb_node_info.compressed_ptr = sb_node_info_compressed_buf.get(); // do the actual serialization now that we have buffer space serialize_and_compress_sb_node_info(node, &sb_node_info, compression_method, &st); // // At this point, we have compressed each of our pieces into individual sub_blocks, // we can put the header and all the subblocks into a single buffer and return it. // // update the serialize times, ignore the header for simplicity. we captured all // of the partitions' serialize times so that's probably good enough. toku_ft_status_update_serialize_times(node, st.serialize_time, st.compress_time);
AI 代码解读
header序列化
序列化pivot key之后,toku_serialize_ftnode_to_memory计算节点node压缩前size和压缩后的size。 计算方法很简单:partition的size总和 + pivot key的size + header的size + 4个字节的overhead(pivot key的checksum)。
节点node压缩之后的size是为分配压缩后的数据buffer,为了支持direct I/O,分配的buffer和buffer size必须是512对齐的。
分配的buffer size记在n_bytes_to_write返回给调用函数;压缩之后的数据存储在bytes_to_write指向的buffer中。
节点node压缩之前的size,就是为了返回给调用函数,记在n_uncompressed_bytes参数中。
// The total size of the node is: // size of header + disk size of the n+1 sub_block's created above uint32_t total_node_size = (serialize_node_header_size(node) // uncompressed header + sb_node_info.compressed_size // compressed nodeinfo (without its checksum) + 4); // nodeinfo's checksum uint32_t total_uncompressed_size = (serialize_node_header_size(node) // uncompressed header + sb_node_info.uncompressed_size // uncompressed nodeinfo (without its checksum) + 4); // nodeinfo's checksum // store the BP_SIZESs for (int i = 0; i < node->n_children; i++) { uint32_t len = sb[i].compressed_size + 4; // data and checksum BP_SIZE (*ndd,i) = len; BP_START(*ndd,i) = total_node_size; total_node_size += sb[i].compressed_size + 4; total_uncompressed_size += sb[i].uncompressed_size + 4; } // now create the final serialized node uint32_t total_buffer_size = roundup_to_multiple(512, total_node_size); // make the buffer be 512 bytes. char *XMALLOC_N_ALIGNED(512, total_buffer_size, data); char *curr_ptr = data;
AI 代码解读
前面提到节点node序列化的过程分为3个阶段:
- partition序列化和压缩
- pivot key序列化和压缩
- header序列化
前2个阶段都讨论过了,header的部分是调用serialize_node_header实现的。
到这里其他部分的序列化和压缩工作都做好了,header的序列化直接在前面分配好的压缩后数据buffer上进行,不需要压缩,也不必分配sub_block数据结构。
header处理完,直接把pivot key的sub_block的compressed_ptr数据和checksum拷贝过来。
pivot key处理完,直接把每个partition的compressed_ptr和checksum依次拷贝过来。
pad的部分写0。
// write the header struct wbuf wb; wbuf_init(&wb, curr_ptr, serialize_node_header_size(node)); serialize_node_header(node, *ndd, &wb); assert(wb.ndone == wb.size); curr_ptr += serialize_node_header_size(node); // now write sb_node_info memcpy(curr_ptr, sb_node_info.compressed_ptr, sb_node_info.compressed_size); curr_ptr += sb_node_info.compressed_size; // write the checksum *(uint32_t *)curr_ptr = toku_htod32(sb_node_info.xsum); curr_ptr += sizeof(sb_node_info.xsum); for (int i = 0; i < npartitions; i++) { memcpy(curr_ptr, sb[i].compressed_ptr, sb[i].compressed_size); curr_ptr += sb[i].compressed_size; // write the checksum *(uint32_t *)curr_ptr = toku_htod32(sb[i].xsum); curr_ptr += sizeof(sb[i].xsum); } // Zero the rest of the buffer memset(data + total_node_size, 0, total_buffer_size - total_node_size); assert(curr_ptr - data == total_node_size); *bytes_to_write = data; *n_bytes_to_write = total_buffer_size; *n_uncompressed_bytes = total_uncompressed_size; invariant(*n_bytes_to_write % 512 == 0); invariant(reinterpret_cast<unsigned long long>(*bytes_to_write) % 512 == 0); return 0; }
AI 代码解读
假若一个node包含2个partition,它的序列化结构如下所示:
反序列化和解压缩过程详解
由于tokudb支持partial fetch(只读某几个partition)和partial evict(即把clean节点的部分partition释放掉),反序列化过程相比序列化过程略复杂一些。
fetch callback通过bfe这个hint告诉toku_deserialize_ftnode_from需要读那些partition。
bfe有五种类型:
- ftnode_fetch_none:只需要读header和pivot key,不需要读任何partition。只用于optimizer计算cost
- ftnode_fetch_keymatch:只需要读match某个key的partition,ydb层提供的一个接口,一般不用
- ftnode_fetch_prefetch:prefetch时使用
- ftnode_fetch_all:需要把所有partition读上来;写节点时使用(msg inject或者msg apply的子节点)
- ftnode_fetch_subset:需要读若干个partition,FT search路径上使用。
只有在ft search高度>1以上的中间节点时,read_all_partitions会被设置成true,走老的代码路径deserialize_ftnode_from_fd,一次性把所有partition都读到内存中。
其他情况会调用read_ftnode_header_from_fd_into_rbuf_if_small_enough,把节点的header读到内存中,然后反序列化header并设置ndd(每个partition的offset和size);解压缩和反序列化pivot key设置pivot信息;根据bfe读取需要的partition。
节点的header,pivot key和partition都有自己的checksum信息,解析每个部分时都要确认checksum是匹配的。
enum ftnode_fetch_type { ftnode_fetch_none = 1, // no partitions needed. ftnode_fetch_subset, // some subset of partitions needed ftnode_fetch_prefetch, // this is part of a prefetch call ftnode_fetch_all, // every partition is needed ftnode_fetch_keymatch, // one child is needed if it holds both keys }; int toku_deserialize_ftnode_from (int fd, BLOCKNUM blocknum, uint32_t fullhash, FTNODE *ftnode, FTNODE_DISK_DATA* ndd, ftnode_fetch_extra *bfe ) // Effect: Read a node in. If possible, read just the header. { int r = 0; struct rbuf rb = RBUF_INITIALIZER; // each function below takes the appropriate io/decompression/deserialize statistics if (!bfe->read_all_partitions) { read_ftnode_header_from_fd_into_rbuf_if_small_enough(fd, blocknum, bfe->ft, &rb, bfe); r = deserialize_ftnode_header_from_rbuf_if_small_enough(ftnode, ndd, blocknum, fullhash, bfe, &rb, fd); } else { // force us to do it the old way r = -1; } if (r != 0) { // Something went wrong, go back to doing it the old way. r = deserialize_ftnode_from_fd(fd, blocknum, fullhash, ftnode, ndd, bfe, NULL); } toku_free(rb.buf); return r; }
AI 代码解读
deserialize_ftnode_header_from_rbuf_if_small_enough比较长,基本是toku_serialize_ftnode_to_memory的相反过程。
header部分是不压缩的,直接解析,比较magic number,解析node->n_children和ndd等。
然后比较header的checksum
node->n_children = rbuf_int(rb); // Guaranteed to be have been able to read up to here. If n_children // is too big, we may have a problem, so check that we won't overflow // while reading the partition locations. unsigned int nhsize; nhsize = serialize_node_header_size(node); // we can do this because n_children is filled in. unsigned int needed_size; needed_size = nhsize + 12; // we need 12 more so that we can read the compressed block size information that follows for the nodeinfo. if (needed_size > rb->size) { r = toku_db_badformat(); goto cleanup; } XMALLOC_N(node->n_children, node->bp); XMALLOC_N(node->n_children, *ndd); // read the partition locations for (int i=0; i<node->n_children; i++) { BP_START(*ndd,i) = rbuf_int(rb); BP_SIZE (*ndd,i) = rbuf_int(rb); } uint32_t checksum; checksum = toku_x1764_memory(rb->buf, rb->ndone); uint32_t stored_checksum; stored_checksum = rbuf_int(rb); if (stored_checksum != checksum) { dump_bad_block(rb->buf, rb->size); r = TOKUDB_BAD_CHECKSUM; goto cleanup; }
AI 代码解读
接着处理pivot key,比较pivot key部分的checksum,解压缩,反序列化,设置pivot信息。
// Finish reading compressed the sub_block const void **cp; cp = (const void **) &sb_node_info.compressed_ptr; rbuf_literal_bytes(rb, cp, sb_node_info.compressed_size); sb_node_info.xsum = rbuf_int(rb); // let's check the checksum uint32_t actual_xsum; actual_xsum = toku_x1764_memory((char *)sb_node_info.compressed_ptr-8, 8+sb_node_info.compressed_size); if (sb_node_info.xsum != actual_xsum) { r = TOKUDB_BAD_CHECKSUM; goto cleanup; } // Now decompress the subblock { toku::scoped_malloc sb_node_info_buf(sb_node_info.uncompressed_size); sb_node_info.uncompressed_ptr = sb_node_info_buf.get(); tokutime_t decompress_t0 = toku_time_now(); toku_decompress( (Bytef *) sb_node_info.uncompressed_ptr, sb_node_info.uncompressed_size, (Bytef *) sb_node_info.compressed_ptr, sb_node_info.compressed_size ); tokutime_t decompress_t1 = toku_time_now(); decompress_time = decompress_t1 - decompress_t0; // at this point sb->uncompressed_ptr stores the serialized node info. r = deserialize_ftnode_info(&sb_node_info, node); if (r != 0) { goto cleanup; } }
AI 代码解读
最后是根据bfe读取需要的partition,读partition是通过调用pf_callback实现的。
// Now we have the ftnode_info. We have a bunch more stuff in the // rbuf, so we might be able to store the compressed data for some // objects. // We can proceed to deserialize the individual subblocks. // setup the memory of the partitions // for partitions being decompressed, create either message buffer or basement node // for partitions staying compressed, create sub_block setup_ftnode_partitions(node, bfe, false); // We must capture deserialize and decompression time before // the pf_callback, otherwise we would double-count. t1 = toku_time_now(); deserialize_time = (t1 - t0) - decompress_time; // do partial fetch if necessary if (bfe->type != ftnode_fetch_none) { PAIR_ATTR attr; r = toku_ftnode_pf_callback(node, *ndd, bfe, fd, &attr, NULL); if (r != 0) { goto cleanup; } }
AI 代码解读
deserialize_ftnode_from_fd的部分留给读者自行分析。