🌼 zipList

吞佛童子2022年10月10日
  • db
  • ziplist
大约 8 分钟

🌼 zipList

  • list & hash & zset 数据量较少时的 底层数据结构
  • hash
    • hash-max-ziplist-entries 512
      • 当 KV 对节点数 > 512 对时,转换为 dict
    • hash-max-ziplist-value 64
      • 当 val 长度 > 64 字节时,转换为 dict
  1. 存储形式
    • ziplist:
      • <zlbytes> <zltail> <zllen> <entry> <entry> ... <entry> <zlend>
    • entry:
      • <prevlen> <len> <entry-data>
  2. 举例:
[0f 00 00 00] [0c 00 00 00] [02 00] [00 f3] [02 f6] [ff]
      |             |          |       |       |     |
   zlbytes        zltail    entries   "2"     "5"   end
  • 整个 ziplist 由以下一个部分组成(小端存储):
  • zlbytes
    • 整个 ziplist 占用的内存 字节数
    • 例中 = 15
  • zltail
    • ziplist 尾部节点 距离首字节的 偏移字节数
    • 例中 = 12,即首字节 0f 与最后一个节点首字节 02 之间的距离
  • zllen
    • 压缩列表 节点数
    • 例中 = 2,表示有 2 个 entry 节点
    • 16 位最多可以表示 2^16-1 个节点,当节点数 > 2^16-2 时,也可以继续存储,只是这样计算 节点数时,需要遍历 ziplist
  • entry
    • 存放实际数据的节点,每个 entry 有自己的内部结构
    • 容易引发 连锁反应
      • 背景:每个 entry 都会用 1 | 5 个字节来记录前一个节点的长度
      • 发生条件:在一个 1 字节记录前一个节点长度的 entry 前,插入一个 大于等于 254 字节的 entry,导致该节点需要多增 4 字节的空间,
        • 若同时,该节点增加 4 节点后,由 小于 254 字节的长度 --> 大于等于 254 字节,则会触发再下一个节点的内存扩展工作
        • 进而影响性能
  • zlend
    • ziplist 的结束点,恒为 0xff
  1. entry 详解
    • <prevlen> <len> <entry-data>
    • prevlen
      • prevlen < 254 Bytes : 1 字节记录
      • prevlen >= 254 Bytes : 5 字节记录 : 前一个字节 === 254(避免和 zlend 的 255 冲突) + 后 4 字节的实际长度
    • len
      • 根据第一个字节的不同,分为以下几种情况:
        • 00xx xxxx
        • 01xx xxxx
        • 10xx xxxx
        • 1100 0000: entry-data 为 2 字节的 int16_t 类型
        • 1101 0000: entry-data 为 4 字节的 int32_t 类型
        • 1110 0000: entry-data 为 8 字节的 int64_t 类型
        • 1111 0000: entry-data 为 3 字节长的 整数
        • 1111 1110: entry-data 为 1 字节长的 整数
        • 1111 xxxx: xxxx 就是 entry-data 的实际值,区间范围为 [0001, 1101],对应实际值为 [0, 12],而非 [1, 13],无需额外的 entry-data
    • entry-data
    • 例中:
      • [00 f3]
        • 00 表示 prevlen == 0 为首节点,f3 表示 entry-data 实际值为 3 - 1 = 2
      • [02 f6]
        • 02 表示 prevlen == 2,f6 表示 entry-data 实际值为 6 - 1 = 5
  2. 特点
    • 通过 zltail 可快速定位到 最后一个元素所在位置,复杂度为 O(1),其他节点复杂度为 O(N)
    • 紧凑模式,节省内存,适用于 节点数量较少时 的情况

1. 数据结构

  • ziplist 没有自定义 struct 之类来表达,而是使用 unsigned char * 表示
    • 原因在于 ziplist 本质上就是一块连续内存,内部组成又是 高度动态 的设计,因此无法通过一个固定的数据结构来表示
/* ziplist 数据结构,要么存放 string 要么存放 integer */
typedef struct {
    unsigned char *sval;
    unsigned int slen; /* When string is used, it is provided with the length (slen). */
    long long lval; /* When integer is used, 'sval' is NULL, and lval holds the value. */
} ziplistEntry;


/* entry 抽象数据结构 */
typedef struct zlentry {
    unsigned int prevrawlensize; /* Bytes used to encode the previous entry len*/
    unsigned int prevrawlen;     /* 前一个节点的长度 - 当小于 254 字节时,用 1 个字节记录长度;当大于等于 254 字节时,用 5 个字节记录长度 */
    unsigned int lensize;        /* entry 编码长度*/
    unsigned int len;            /* entry 实际长度,字节 */
    unsigned int headersize;     /* prevrawlensize + lensize. */
    unsigned char encoding;      /* 编码方式 */
    unsigned char *p;            /* 数据指针 */
} zlentry;

/* entry 数据结构 */
static inline void zipEntry(unsigned char *p, zlentry *e) {
    ZIP_DECODE_PREVLEN(p, e->prevrawlensize, e->prevrawlen); // 前一个节点的长度
    ZIP_ENTRY_ENCODING(p + e->prevrawlensize, e->encoding); // 编码长度
    ZIP_DECODE_LENGTH(p + e->prevrawlensize, e->encoding, e->lensize, e->len); // 实际数据长度
    assert(e->lensize != 0); /* check that encoding was valid. */
    e->headersize = e->prevrawlensize + e->lensize;
    e->p = p;
}

2. 创建

/* Create a new empty ziplist. */
unsigned char *ziplistNew(void) {
    unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE;
    unsigned char *zl = zmalloc(bytes);
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
    ZIPLIST_LENGTH(zl) = 0;
    zl[bytes-1] = ZIP_END;
    return zl;
}

3. 常用方法

// 前一个节点长度所占当前 entry 的字节数
#define ZIP_BIG_PREVLEN 254 
#define ZIP_DECODE_PREVLENSIZE(ptr, prevlensize) do {                          
    if ((ptr)[0] < ZIP_BIG_PREVLEN) {                                          
        (prevlensize) = 1;                                                     
    } else {                                                                   
        (prevlensize) = 5;                                                     
    }                                                                          
} while(0)

方法概览

// 创建一个空的 ziplist,只包含 <zlbytes> <zltail> <zllen> <zlend>
unsigned char *ziplistNew(void);
// 将两条 ziplist 合并为一个新的 ziplist
unsigned char *ziplistMerge(unsigned char **first, unsigned char **second);
// 在 ziplist 的头部 | 尾部 插入数据,返回一个新的 ziplist
unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where);
// 返回 index 处的数据项的 内存地址
unsigned char *ziplistIndex(unsigned char *zl, int index);
// 返回 ziplist 指定项 p 的后一项
unsigned char *ziplistNext(unsigned char *zl, unsigned char *p);
// 返回 ziplist 指定项 p 的前一项
unsigned char *ziplistPrev(unsigned char *zl, unsigned char *p);
unsigned int ziplistGet(unsigned char *p, unsigned char **sval, unsigned int *slen, long long *lval);
// 在数据项 p 的前面插入 s
unsigned char *ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen);
// 删除指定数据项
unsigned char *ziplistDelete(unsigned char *zl, unsigned char **p);
// 删除指定区间节点
unsigned char *ziplistDeleteRange(unsigned char *zl, int index, unsigned int num);
// 替换指定数据项
unsigned char *ziplistReplace(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen);
// 两条 ziplist 比较
unsigned int ziplistCompare(unsigned char *p, unsigned char *s, unsigned int slen);
// 查找给定数据,skip 表示每次查找要跳过的数据项,用于奇数存 key,偶数存 val 的情况
unsigned char *ziplistFind(unsigned char *zl, unsigned char *p, unsigned char *vstr, unsigned int vlen, unsigned int skip);
unsigned int ziplistLen(unsigned char *zl);
size_t ziplistBlobLen(unsigned char *zl);
void ziplistRepr(unsigned char *zl);
typedef int (*ziplistValidateEntryCB)(unsigned char* p, unsigned int head_count, void* userdata);
int ziplistValidateIntegrity(unsigned char *zl, size_t size, int deep, ziplistValidateEntryCB entry_cb, void *cb_userdata);
void ziplistRandomPair(unsigned char *zl, unsigned long total_count, ziplistEntry *key, ziplistEntry *val);
void ziplistRandomPairs(unsigned char *zl, unsigned int count, ziplistEntry *keys, ziplistEntry *vals);
unsigned int ziplistRandomPairsUnique(unsigned char *zl, unsigned int count, ziplistEntry *keys, ziplistEntry *vals);
int ziplistSafeToAdd(unsigned char* zl, size_t add);

1) 删除节点

/* Delete "num" entries, starting at "p". Returns pointer to the ziplist. */
unsigned char *__ziplistDelete(unsigned char *zl, unsigned char *p, unsigned int num) {
    unsigned int i, totlen, deleted = 0;
    size_t offset;
    int nextdiff = 0;
    zlentry first, tail;
    size_t zlbytes = intrev32ifbe(ZIPLIST_BYTES(zl));

    zipEntry(p, &first); /* no need for "safe" variant since the input pointer was validated by the function that returned it. */
    for (i = 0; p[0] != ZIP_END && i < num; i++) {
        p += zipRawEntryLengthSafe(zl, zlbytes, p);
        deleted++;
    }

    assert(p >= first.p);
    totlen = p-first.p; /* Bytes taken by the element(s) to delete. */
    if (totlen > 0) {
        uint32_t set_tail;
        if (p[0] != ZIP_END) {
            /* Storing `prevrawlen` in this entry may increase or decrease the
             * number of bytes required compare to the current `prevrawlen`.
             * There always is room to store this, because it was previously
             * stored by an entry that is now being deleted. */
            nextdiff = zipPrevLenByteDiff(p,first.prevrawlen);

            /* Note that there is always space when p jumps backward: if
             * the new previous entry is large, one of the deleted elements
             * had a 5 bytes prevlen header, so there is for sure at least
             * 5 bytes free and we need just 4. */
            p -= nextdiff;
            assert(p >= first.p && p<zl+zlbytes-1);
            zipStorePrevEntryLength(p,first.prevrawlen);

            /* Update offset for tail */
            set_tail = intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))-totlen;

            /* When the tail contains more than one entry, we need to take
             * "nextdiff" in account as well. Otherwise, a change in the
             * size of prevlen doesn't have an effect on the *tail* offset. */
            assert(zipEntrySafe(zl, zlbytes, p, &tail, 1));
            if (p[tail.headersize+tail.len] != ZIP_END) {
                set_tail = set_tail + nextdiff;
            }

            /* Move tail to the front of the ziplist */
            /* since we asserted that p >= first.p. we know totlen >= 0,
             * so we know that p > first.p and this is guaranteed not to reach
             * beyond the allocation, even if the entries lens are corrupted. */
            size_t bytes_to_move = zlbytes-(p-zl)-1;
            memmove(first.p,p,bytes_to_move);
        } else {
            /* The entire tail was deleted. No need to move memory. */
            set_tail = (first.p-zl)-first.prevrawlen;
        }

        /* Resize the ziplist */
        offset = first.p-zl;
        zlbytes -= totlen - nextdiff;
        zl = ziplistResize(zl, zlbytes);
        p = zl+offset;

        /* Update record count */
        ZIPLIST_INCR_LENGTH(zl,-deleted);

        /* Set the tail offset computed above */
        assert(set_tail <= zlbytes - ZIPLIST_END_SIZE);
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(set_tail);

        /* When nextdiff != 0, the raw length of the next entry has changed, so
         * we need to cascade the update throughout the ziplist */
        if (nextdiff != 0)
            zl = __ziplistCascadeUpdate(zl,p);
    }
    return zl;
}

2) 插入节点

/* 在 p 位置前插入一段新数据,待插入新数据的地址指针为 s,长度为 slen */
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen, newlen;
    unsigned int prevlensize, prevlen = 0;
    size_t offset;
    int nextdiff = 0;
    unsigned char encoding = 0;
    long long value = 123456789; /* initialized to avoid warning. Using a value
                                    that is easy to see if for some reason
                                    we use it uninitialized. */
    zlentry tail;

    /* 记录 p.prevlen,作为 s.prevlen */
    if (p[0] != ZIP_END) {
        ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
    } else {
        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
        if (ptail[0] != ZIP_END) {
            prevlen = zipRawEntryLengthSafe(zl, curlen, ptail);
        }
    }

    /* 判断 编码方式 是否需要修改 */
    if (zipTryEncoding(s,slen,&value,&encoding)) {
        /* 'encoding' is set to the appropriate integer encoding */
        reqlen = zipIntSize(encoding);
    } else {
        /* 'encoding' is untouched, however zipStoreEntryEncoding will use the
         * string length to figure out how to encode it. */
        reqlen = slen;
    }
    /* 计算 待插入数据如果正常插入需要的 字节数,包含 p.prevlen 和 encode 等的改变造成字节长度的变化 */
    reqlen += zipStorePrevEntryLength(NULL,prevlen);
    reqlen += zipStoreEntryEncoding(NULL,encoding,slen);

    /* 计算 待插入数据 对源数据长度带来的改变 */
    int forcelarge = 0;
    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
    if (nextdiff == -4 && reqlen < 4) {
        nextdiff = 0;
        forcelarge = 1;
    }

    /* 计算新的 ziplist 的字节数,重新调整大小分配内存 */
    offset = p-zl;
    newlen = curlen+reqlen+nextdiff;
    zl = ziplistResize(zl,newlen);
    p = zl+offset;

    /* 将 p 数据向后移动 */
    if (p[0] != ZIP_END) {
        /* Subtract one because of the ZIP_END bytes */
        memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

        /* Encode this entry's raw length in the next entry. */
        if (forcelarge)
            zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
        else
            zipStorePrevEntryLength(p+reqlen,reqlen);

        /* Update offset for tail */
        ZIPLIST_TAIL_OFFSET(zl) =
            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

        /* When the tail contains more than one entry, we need to take
         * "nextdiff" in account as well. Otherwise, a change in the
         * size of prevlen doesn't have an effect on the *tail* offset. */
        assert(zipEntrySafe(zl, newlen, p+reqlen, &tail, 1));
        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
        }
    } else {
        /* This element will be the new tail. */
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
    }

    /* When nextdiff != 0, the raw length of the next entry has changed, so
     * we need to cascade the update throughout the ziplist */
    if (nextdiff != 0) {
        offset = p-zl;
        zl = __ziplistCascadeUpdate(zl,p+reqlen);
        p = zl+offset;
    }

    /* Write the entry */
    p += zipStorePrevEntryLength(p,prevlen);
    p += zipStoreEntryEncoding(p,encoding,slen);
    if (ZIP_IS_STR(encoding)) {
        memcpy(p,s,slen);
    } else {
        zipSaveInteger(p,value,encoding);
    }
    ZIPLIST_INCR_LENGTH(zl,1);
    return zl;
}
上次编辑于: 2022/10/10 下午8:43:48
贡献者: liuxianzhishou