开发者

Lucene词向量索引文件构建源码解析

开发者 https://www.devze.com 2022-11-28 13:39 出处:网络 作者: 沧叔解码
目录背景特殊说明源码解读工具类FieldsIndexWriter核心类TermVectorsConsumerLucene90CompressingTermVectorsWriter父类TermVect...
目录
  • 背景
  • 特殊说明
  • 源码解读
    • 工具类
      • FieldsIndexWriter
    • 核心类
      • TermVectorsConsumer
      • Lucene90CompressingTermVectorsWriter
    • 父类
      • TermVectorsWriter
      • 成员变量
    • 内部类
      • DocData
      • FieldData
    • 构造方法
      • 核心方法
        • startDocument
        • startField
        • startTerm
        • addPosition
        • finishField
        • finishDocument
        • triggerFlush
      • 构建chunk
        • flush
        • flushNumFields
        • flushFieldNums
        • flushFields
        • flushFlags
        • flushNumTerms
        • flushTermLengths
        • flushTermFreqs
        • flushPositions
        • flushOffsets
        • flushPayloadLengths
        • finish
      • TermVectorsConsumerPerField
      • 索引文件格式
        • tvm
          • 字段详解
            • Header
            • PackedItsVersion
            • ChunkSize
            • NumChunks
            • NumDirtyChunks
            • NumDirtyDocs
            • NumDocs
            • blockShift
            • TotalChunks + 1
            • tvxDocStartFP
            • DocBlockMeta
            • tvxOffsetStartFP
            • OffsetBlockMeta
            • SPEndPoint
            • MaxPointer
            • Footer
          • tvd 字段详解
            • Header
            • chunk
            • Footer
          • tvx 字段详解
            • Header
            • ChunkStartDocIDs
            • ChunkTVDOffsetsBlock
            • Footer

        背景

        词向量存储的信息内容其实和倒排(Posting)是一样的,也是每个term所出现的文档列表以及在文档中的位置信息,区别在于存储结构的不同:

        Posting的存储结构是:field -> term -> doc -> freq/pos/offset

        也就是说Posting是从字段定位到term,再定位到文档,获取位置信息。

        TermVector的存储结构是:doc -> field -> term -> freq/pos/offset

        TermVector是从文档定位到字段,再定位term,获取位置信息。

        从上面的介绍中,我们可以看出一些基本的规律:

        • query查询是通过Posting来查找匹配的文档的,因为query就是从field中查找匹配的term,顺着Posting的结构,下一步就能得到所有匹配的文档了。
        • 从指定文档中获取指定字段单个term的匹配位置,则TermVector(doc查找1次,field查找1次,term查找1次,)和Posting(field查找1次,term查找1次,doc查找1次)效率差不多。
        • 从指定文档中获取指定字段多个term的匹配位置,则TermVector(doc查找1次,field查找1次,term查找n次)性能比Posting(field查找1次,term查找n次,doc查找n次)好。
        • 多字段多term查询位置信息也是TermVector性能比较好,大家可以自行分析。

        从上面的规律可以看出,检索过程确实使用的Posting,毕竟这是真的倒排索引。TermVector适用于从特定文档中获取某些字段中term的位置信息,典型应用就是高亮:获取特定的文档中相关term的位置。

        特殊说明

        词向量构建涉及到几个类之间的关系

        词向量的构建中主要有3个类:

        • TermVectorsConsumer:调度词向量的构建的最上层逻辑,负责创建TermVectorsConsumerPerField和Lucene90CompressingTermVectorsWriter。
        • TermVectorsConsumerPerField:每个开启词向量构建的字段都对应一个TermVectorsConsumerPerField,在TermVectorsConsumerPerField中是把词向量的倒排信息临时存在内存buffer中,在完成一个document的处理之后,会把这个document的所有词向量数据都序列化到Lucene90CompressingTermVectorsWriter的缓存中,然后重置buffer,等到处理下一个document。
        • Lucene90CompressingTermVectorsWriter:负责组织词向量的索引文件格式并持久化。

        term的存储

        在词向量中的term是按序存储的,但是每个Field中的所有的term,除了完整的存储第一个term之外,其他term都是存储除了跟前一个term的最长公共前缀的剩余的后缀部分。

        chunk的生成条件

        词向量是使用chunk来划分数据的,生成chunk的条件满足以下二者其一即可:

        • 缓存中的doc数量超出设置的阈值
        • suffix的数据总量超出了设置的阈值

        词向量的索引文件

        词向量最终构建生成3个索引文件:

        • tvd:按chunk存储的term,freq,position,offset,payload信息
        • tvx:chunk的索引文件,记录的是每个chunk的起始docID,以及每个chunk的起始位置,方便根据docID快速定位到chunk。
        • tvm:词向量索引文件的元信息,用来读取词向量使用的。

        源码解读

        注意:本文源码基于lucene-9.1.0版本

        工具类

        FieldsIndexWriter

        FieldsIndexWriter这个工具类,以后介绍正排索引文件也会用到,它主要是用来生成所有chunk中的起始doc编号和chunk在数据文件中的位置信息,方便读取的时候快速定位doc所属的chunk,并从文件中读取chunk。总体逻辑是先使用临时文件存储前面说的两个信息,在真正生成索引文件的时候,使用DirectMonotonicWriter进行压缩存储,减小索引文件大小。

        public final class FieldsIndexWriter implements Closeable {
          static final int VERSION_START = 0;
          static final int VERSION_CURRENT = 0;
          private final Directory dir;
          // 下面这些信息都是用来创建真正索引文件名的  
          private final String name;
          private final String suffix;
          private final String extension;
          private final String codecName;
          private final byte[] id;
          // DirectMonotonicWriter 所需的参数
          private final int blockShift;
          private final IOContext ioContext;
          // 临时文件,用来保存所有chunk中的文档数  
          private IndexOutput docsOut;
          // 临时文件,用来保存所有chunk在数据文件中的起始位置  
          private IndexOutput filePointersOut;
          // doc总数  
          private int totalDocs;
          // chunk总数  
          private int totalChunks;
          // 前一个chunk在tvd索引文件中的起始位置  
          private long previousFP;
          // 添加一个新的index,index就是用来定位doc属于哪个chunk,以及chunk在数据文件中的起始位置。
          // numDocs是chunk中的文档总数,后面真正序列化到正式的索引文件会通过换算,得到的是每个chunk的起始docID。  
          void writeIndex(int numDocs, long startPointer) throws IOException {
            assert startPointer >= previousFP;
            docsOut.writeVInt(numDocs);
            filePointersOut.writeVLong(startPointer - previousFP);
            previousFP = startPointer;
            totalDocs += numDocs;
            totalChunks++;
          }
          // metaOut是元信息索引文件  
          void finish(int numDocs, long maxPointer, IndexOutput metaOut) throws IOException {
            if (numDocs != totalDocs) {
              throw new IllegalStateException("Expected " + numDocs + " docs, but got " + totalDocs);
            }
            // 完成临时文件的写入  
            CodecUtil.writeFooter(docsOut);
            CodecUtil.writeFooter(filePointersOut);
            IOUtils.close(docsOut, filePointersOut);
            // 创建真正的索引文件  
            try (IndexOutput dataOut =
                dir.createOutput(IndexFileNames.segmentFileName(name, suffix, extension), ioContext)) {
              CodecUtil.writeIndexHeader(dataOut, codecName + "Idx", VERSION_CURRENT, id, suffix);
              // chunk中的doc总数
              metaOut.writeInt(numDocs);
              // 后面使用DirectMonotonicWriter压缩的时候需要的参数 
              metaOut.writeInt(blockShift);
              // 使用DirectMonotonicWriter写入的数据总数,为什么加1,看下面的具体写入逻辑。
              metaOut.writeInt(totalChunks + 1);
              // chunk索引文件中ChunkStartDocIDs的起始位置  
              metaOut.writeLong(dataOut.getFilePointer());
              try (ChecksumIndexInput docsIn =
                  dir.openChecksumInput(docsOut.getName(), IOContext.READONCE)) {
                CodecUtil.checkHeader(docsIn, codecName + "Docs", VERSION_CURRENT, VERSION_CURRENT);
                Throwable priorE = null;
                try {
                  // 压缩存储所有chunk的起始doc编号  
                  final DirectMonotonicWriter docs =
                      DirectMonotonicWriter.getInstance(metaOut, dataOut, totalChunks + 1, blockShift);
                  long doc = 0;
                  docs.add(doc); // 第一个chunk的起始doc编号肯定是0,这也是上面totalChunks + 1的原因之一。
                  for (int i = 0; i < totalChunks; ++i) {
                    doc += docsIn.readVInt();
                    docs.add(doc);
                  }
                  docs.finish();
                  if (doc != totalDocs) {
                    throw new CorruptIndexException("Docs don't add up", docsIn);
                  }
                } catch (Throwable e) {
                  priorE = e;
                } finally {
                  CodecUtil.checkFooter(docsIn, priorE);
                }
              }
              // 删除临时文件  
              dir.deleteFile(docsOut.getName());
              docsOut = null;
              // chunk索引文件中ChunkOffsets的起始位置
              metaOut.writeLong(dataOut.getFilePointer());
              try (ChecksumIndexInput filePointersIn =
                  dir.openChecksumInput(filePointersOut.getName(), IOContext.READONCE)) {
                CodecUtil.checkHeader(
                    filePointersIn, codecName + "FilePointers", VERSION_CURRENT, VERSION_CURRENT);
                Throwable priorE = null;
                try {
                  // 压缩存储所有chunk在tvd文件中的起始位置  
                  final DirectMonotonicWriter filePointers =
                      DirectMonotonicWriter.getInstance(metaOut, dataOutjs, totalChunks + 1, blockShift);
                  long fp = 0;
                  for (int i = 0; i < totalChunks; ++i) {
                    fp += filePointersIn.readVLong();
                    filePointers.add(fp);
                  }
                  if (maxPointer < fp) {
                    throw new CorruptIndexException("File pointers don't add up", filePointersIn);
                  }
                  filePointers.add(maxPointer); // 上面totalChunks + 1的原因之二。
                  filePointers.finish();
                } catch (Throwable e) {
                  priorE = e;
                } finally {
                  CodecUtil.checkFooter(filePointersIn, priorE);
                }
              }
              dir.deleteFile(filePointersOut.getName());
              filePointersOut = null;
              metaOut.writeLong(dataOut.getFilePointer());
              metaOut.writeLong(maxPointer);
              CodecUtil.writeFooter(dataOut);
            }
          }
        }
        

        核心类

        TermVectorsConsumer

        class TermVectorsConsumer extends TermsHash {
          protected final Directory directory;
          protected final SegmentInfo info;
          protected final Codec codec;
          // 词向量持久化  
          TermVectorsWriter writer;
          /** Scratch term used by TermVectorsConsumerPerField.finishDocument. */
          final BytesRef flushTerm = new BytesRef();
          // 用来从TermVectorsConsumerPerField的bytepool中读取position信息
          final ByteSliceReader vectorSliceReaderPos = new ByteSliceReader();
          final ByteSliceReader vectorSliceReaderOff = new ByteSliceReader();
          private boolean hasVectors;
          private int numVectorFields;
          int lastDocID;
          // 每一个开启词向量构建的Field,都有一个TermVectorsConsumerPerField,当一个doc处理完之后会把所有的
          // TermVectorsConsumerPerField都序列化到Lucene90CompressingTermVectorsWriter中,然后重置等待处理下一个doc
          private TermVectorsConsumerPerField[] perFields = new TermVectorsConsumerPerField[1];
          Accountable accountable = Accountable.NULL_ACCOUNTABLE;
          TermVectorsConsumer(
              final IntBlockPool.Allocator intBlockAllocator,
              final ByteBlockPool.Allocator byteBlockAllocator,
              Directory directory,
              SegmentInfo info,
              Codec codec) {
            super(intBlockAllocator, byteBlockAllocator, Counter.newCounter(), null);
            this.directory = directory;
            this.info = info;
            this.codec = codec;
          }
          @Override
          void flush(
              Map<String, TermsHashPerField> fieldsToFlush,
              final SegmentWriteState state,
              Sorter.DocMap sortMap,
              NormsProducer norms)
              throws IOException {
            if (writer != null) {
              int numDocs = state.segmentInfo.maxDoc();
              try {
                // 把不存在词向量Filed的文档填充下  
                fill(numDocs);
                assert state.segmentInfo != null;
                // 触发词向量索引文件持久化落盘   
                writer.finish(numDocs);
              } finally {
                IOUtils.close(writer);
              }
            }
          }
          /**
           * Fills in no-term-vectors for all docs we haven't seen since the last doc that had term vectors.
           */
          void fill(int docID) throws IOException {
            while (lastDocID < docID) {
              writer.startDocument(0);
              writer.finishDocument();
              lastDocID++;
            }
          }www.devze.com
          // 创建  Lucene90CompressingTermVectorsWriter
          void initTermVectorsWriter() throws IOException {
            if (writer == null) {
              IOContext context = new IOContext(new FlushInfo(lastDocID, bytesUsed.get()));
              writer = codec.termVectorsFormat().vectorsWriter(directory, info, context);
              lastDocID = 0;
              accountable = writer;
            }
          }
          void setHasVectors() {
            hasVectors = true;
          }
          @Override
          void finishDocument(int docID) throws IOException {
            // 不存在词向量,直接返回
            if (!hasVectors) {
              return;
            }
            // 按字段名排序TermVectorsConsumerPerField
            ArrayUtil.introSort(perFields, 0, numVectorFields);
            initTermVectorsWriter();
            // 为了确保doc是连续,则把lastDocID到docID的空白填充下
            fill(docID);
            // 开始序列化
            writer.startDocument(numVectorFields);
            // 处理document中的所有Field,会把相关的词向量数据写到writer的缓存中
            for (int i = 0; i < numVectorFields; i++) {
              perFields[i].finishDocument();
            }
            // 结束一个document词向量的序列化  
            writer.finishDocument();
            assert lastDocID == docID : "lastDocID=" + lastDocID + " docID=" + docID;
            lastDocID++;
            super.reset();
            // 重置TermVectorsConsumerPerField 数组,等待处理下一个document  
            resetFields();
          }
          @Override
          public void abort() {
            try {
              super.abort();
            } finally {
              IOUtils.closeWhileHandlingException(writer);
              reset();
            }
          }
          void resetFields() {
            Arrays.fill(perFields, null); // don't hang onto stuff from previous doc
            numVectorFields = 0;
          }
          @Override
          public TermsHashPerField addField(FieldInvertState invertState, FieldInfo fieldInfo) {
            return new TermVectorsConsumerPerField(invertState, this, fieldInfo);
          }
          // 当结束一个Field的所有term的处理之后,就把TermVectorsConsumerPerField存在perFields中,
          // 等待把数据都序列化到Lucene90CompressingTermVectorsWriter中
          void addFieldToFlush(TermVectorsConsumerPerField fieldToFlush) {
            if (numVectorFields == perFields.length) {
              int ne编程客栈wSize = ArrayUtil.oversize(numVectorFields + 1, RamUsageEstimator.NUM_BYTES_OBJECT_REF);
              TermVectorsConsumerPerField[] newArray = new TermVectorsConsumerPerField[newSize];
              System.arraycopy(perFields, 0, newArray, 0, numVectorFields);
              perFields = newArray;
            }
            perFields[numVectorFields++] = fieldToFlush;
          }
          @Override
          void startDocument() {
            resetFields();
            numVectorFields = 0;
          }
        }
        

        Lucene90CompressingTermVectorsWriter

        Lucene90CompressingTermVectorsWriter是生成词向量索引文件的核心类,主要负责按照特定的索引文件格式组织数据并持久化。

        父类

        TermVectorsWriter

        Lucene90CompressingTermVectorsWriter的父类有大量的抽象方法,剩下一个模板方法addProx用来添加term在field中的所有的位置信息。

        public abstract class TermVectorsWriter implements Closeable, Accountable {
          /** Sole constructor. (For invocation by subclass constructors, typically implicit.) */
          protected TermVectorsWriter() {}
          // 开始持久化一个doc的所有词向量
          public abstract void startDocument(int numVectorFields) throws IOException;
          // 结束一个文档持久化的时候调用
          public void finishDocument() throws IOException {};
          // 开始持久化一个Field
          public abstract void startField(
              FieldInfo info, int numTerms, boolean positions, boolean offsets, boolean payloads)
              throws IOException;
          // 结束一个Field的处理
          public void finishField() throws IOException {};
          // 开始持久化一个term的倒排信息
          public abstract void startTerm(BytesRef term, int freq) throws IOException;
          // 结束一个term的处理
          public void finishTerm() throws IOException {}
          // 构建term的一个position信息
          public abstract void addPosition(int position, int startOffset, int endOffset, BytesRef payload)
              throws IOException;
          // 所有文档处理完成,在close方法调用之前,调用finish,numDoc是处理的文档总数
          public abstract void finish(int numDocs) throws IOException;
          // 从positions和offsets中读取所有的位置信息,其实就是从TermVectorsConsumerPerField#bytePool中读取
          public void addProx(int numProx, DataInput positions, DataInput offsets) throws IOException {
            int position = 0;
            int lastOffset = 0;
            BytesRefBuilder payload = null;
            for (int i = 0; i < numProx; i++) {
              final int startOffset;
              final int endOffset;
              final BytesRef thisPayload;
              if (positions == null) {
                position = -1;
                thisPayload = null;
              } else {
                int code = positions.readVInt();
                position += code >>> 1;
                if ((code & 1) != 0) {
                  final int payloadLength = positions.readVInt();
                  if (payload == null) {
                    payload = new BytesRefBuilder();
                  }
                  payload.grow(payloadLength);
                  positions.readBytes(payload.bytes(), 0, payloadLength);
                  payload.setLength(payloadLength);
                  thisPayload = payload.get();
                } else {
                  thisPayload = null;
                }
              }
              if (offsets == null) {
                startOffset = endOffset = -1;
              } else {
                startOffset = lastOffset + offsets.readVInt();
                endOffset = startOffset + offsets.readVInt();
                lastOffset = endOffset;
              }
              // 子类实现的真正的添加  
              addPosition(position, startOffset, endOffset, thisPayload);
            }
          }
            // 删除了一些跟merge相关的方法,以后介绍merge的时候再说
          @Override
          public abstract void close() throws IOException;
        }
        

        成员变量

          // sement的名称
          private final String segment;
          // 生成tvx索引文件
          private FieldsIndexWriter indexWriter;
          // metaStream:生成tvm索引文件
          // vectorStream:生成tvd索引文件
          private IndexOutput metaStream, vectoRSStream;
          // 压缩算法
          private final CompressionMode compressionMode;
          private final Compressor compressor;
          // tvx中的chunk大小是2^chunkSize
          private final int chunkSize;
          // chunk总数
          private long numChunks; 
          // 如果一个chunk中包含的doc信息是不完整的,则算一次
          private long numDirtyChunks; 
          // 在dirtyChunk中的doc总数
          private long numDirtyDocs; 
          // 处理的doc总数
          private int numDocs;
          // 构建过程中暂时存储的DocData,触发flush的话就会持久化
          private final Deque<DocData> pendingDocs;
          // 当前正在处理的doc
          private DocData curDoc;
          // 当前正在处理的field
          private FieldData curField;
          // 上一个处理的term
          private final BytesRef lastTerm;
          // 全局临时存储的buf
          private int[] positionsBuf, startOffsetsBuf, lengthsBuf, payloadLengthsBuf;
          // 存储后缀
          private final ByteBuffersDataOutput termSuffixes;
          // 存储payload信息
          private final ByteBuffersDataOutput payloadBytes;
          // 批量整型的压缩工具
          private final BlockPackedwriter writer;
          // 一个chunk中最多的文档数
          private final int maxDocsPerChunk; 
          private final ByteBuffersDataOutput scratchBuffer = ByteBuffersDataOutput.newResettableInstance();
        

        内部类

        DocData

        表示当前要序列化的一个doc的所有的词向量数据信息。

        private class DocData {
          // doc中有多少个field  
          final int numFields;
          // 每个field的词向量信息  
          final Deque<FieldData> fields;
          // 当前doc在全局buffer(positionsBuf, startOffsetsBuf, payloadLengthsBuf)中的起始位置  
          final int posStart, offStart, payStart;
          DocData(int numFields, int posStart, int offStart, int payStart) {
            this.numFields = numFields;
            this.fields = new ArrayDeque<>(numFields);
            this.posStart = posStart;
            this.offStart = offStart;
            this.payStart = payStart;
          }
          // 新增一个field  
          FieldData addField(
              int fieldNum, int numTerms, boolean positions, boolean offsets, boolean payloads) {
            final FieldData field;
            if (fields.isEmpty()) {
              field =
                  new FieldData(
                      fieldNum, numTerms, positions, offsets, payloads, posStart, offStart, payStart);
            } else {
              final FieldData last = fields.getLast();
              // 计算当前field的一些起始位置,也就是前一个field的起始位置+前一个field的所有的数据量  
              final int posStart = last.posStart + (last.hASPositions ? last.totalPositions : 0);
              final int offStart = last.offStart + (last.hasOffsets ? last.totalPositions : 0);
              final int payStart = last.payStart + (last.hasPayloads ? last.totalPositions : 0);
              field =
                  new FieldData(
                      fieldNum, numTerms, positions, offsets, payloads, posStart, offStart, payStart);
            }
            fields.add(field);
            return field;
          }
        }
        

        FieldData

        存储一个field的所有的词向量的所需的数据。

        private class FieldData {
          final boolean hasPositions, hasOffsets, hasPayloads;
          // flags是个混合标记位,标记是否需要构建position,offset,payload  
          final int fieldNum, flags, numTerms;
          // freqs:存储的是每个term的频率
          // prefixLengths:存储的是当前term和前一个term的公共前缀的长度
          // suffixLengths:存储的是除了当前term和前一个term的公共前缀的剩余部分的长度
          final int[] freqs, prefixLengths, suffixLengths;
          // 当前Field的position,offset,payload数据在全局buf中的起始位置  
          final int posStart, offStart, payStart;
          int totalPositions;
          // 当前处理的是第几个term  
          int ord;
          FieldData(
              int fieldNum,
              int numTerms,
              boolean positions,
              boolean offsets,
              boolean payloads,
              int posStart,
              int offStart,
              int payStart) {
            this.fieldNum = fieldNum;
            this.numTerms = numTerms;
            this.hasPositions = positions;
            this.hasOffsets = offsets;
            this.hasPayloads = payloads;
            this.flags =
                (positions ? POSITIONS : 0) | (offsets ? OFFSETS : 0) | (payloads ? PAYLOADS : 0);
            this.freqs = new int[numTerms];
            this.prefixLengths = new int[numTerms];
            this.suffixLengths = new int[numTerms];
            this.posStart = posStart;
            this.offStart = offStart;
            this.payStart = payStart;
            totalPositions = 0;
            ord = 0;
          }
          // 新增一个term
          // prefixLength:和前一个term的最长公共前缀
          // suffixLength:除了prefix剩下的就是suffix  
          void addTerm(int freq, int prefixLength, int suffixLength) {
            freqs[ord] = freq;
            prefixLengths[ord] = prefixLength;
            suffixLengths[ord] = suffixLength;
            ++ord;
          }
          // 为当前处理的term新增一个位置信息数据,数据都是暂存在全局的buffer中
          void addPosition(int position, int startOffset, int length, int payloadLength) {
            if (hasPositions) {
              if (posStart + totalPositions == positionsBuf.length) {
                positionsBuf = ArrayUtil.grow(positionsBuf);
              }
              positionsBuf[posStart + totalPositions] = position;
            }
            if (hasOffsets) {
              if (offStart + totalPositions == startOffsetsBuf.length) {
                final int newLength = ArrayUtil.oversize(offStart + totalPositions, 4);
                startOffsetsBuf = ArrayUtil.growExact(startOffsetsBuf, newLength);
                lengthsBuf = ArrayUtil.growExact(lengthsBuf, newLength);
              }
              startOffsetsBuf[offStart + totalPositions] = startOffset;
              lengthsBuf[offStart + totalPositions] = length;
            }
            if (hasPayloads) {
              if (payStart + totalPositions == payloadLengthsBuf.length) {
                payloadLengthsBuf = ArrayUtil.grow(payloadLengthsBuf);
              }
              payloadLengthsBuf[payStart + totalPositions] = payloadLength;
            }
            ++totalPositions;
          }
        }
        

        构造方法

        Lucene90CompressingTermVectorsWriter(
            Directory directory,
            SegmentInfo si,
            String segmentSuffix,
            IOContext context,
            String formatName,
            CompressionMode compressionMode,
            int chunkSize,
            int maxDocsPerChunk,
            int blockShift)
            throws IOException {
          assert directory != null;
          this.segment = si.name;
          this.compressionMode = compressionMode;
          this.compressor = compressionMode.newCompressor();
          this.chunkSize = chunkSize;
          this.maxDocsPerChunk = maxDocsPerChunk;
          numDocs = 0;
          pendingDocs = new ArrayDeque<>();
          termSuffixes = ByteBuffersDataOutput.newResettableInstance();
          payloadBytes = ByteBuffersDataOutput.newResettableInstance();
          lastTerm = new BytesRef(ArrayUtil.oversize(30, 1));
          boolean success = false;
          try {
            metaStream =
                directory.createOutput(
                    IndexFileNames.segmentFileName(segment, segmentSuffix, VECTORS_META_EXTENSION),
                    context);
            CodecUtil.writeIndexHeader(
                metaStream,
                VECTORS_INDEX_CODEC_NAME + "Meta",
                VERSION_CURRENT,
                si.getId(),
                segmentSuffix);
            assert CodecUtil.indexHeaderLength(VECTORS_INDEX_CODEC_NAME + "Meta", segmentSuffix)
                == metaStream.getFilePointer();
            vectorsStream =
                directory.createOutput(
                    IndexFileNames.segmentFileName(segment, segmentSuffix, VECTORS_EXTENSION), context);
            CodecUtil.writeIndexHeader(
                vectorsStream, formatName, VERSION_CURRENT, si.getId(), segmentSuffix);
            assert CodecUtil.indexHeaderLength(formatName, segmentSuffix)
                == vectorsStream.getFilePointer();
            // 生成tvx索引文件
            indexWriter =
                new FieldsIndexWriter(
                    directory,
                    segment,
                    segmentSuffix,
                    VECTORS_INDEX_EXTENSION,
                    VECTORS_INDEX_CODEC_NAME,
                    si.getId(),
                    blockShift,
                    context);
            // 记录PackedInts的版本
            metaStream.writeVInt(PackedInts.VERSION_CURRENT);
            // 记录chunkSize  
            metaStream.writeVInt(chunkSize);
            writer = new BlockPackedWriter(vectorsStream, PACKED_BLOCK_SIZE);
            // 全局buffer,用来临时存储数据  
            positionsBuf = new int[1024];
            startOffsetsBuf = new int[1024];
            lengthsBuf = new int[1024];
            payloadLengthsBuf = new int[1024];
            success = true;
          } finally {
            if (!success) {
              IOUtils.closeWhileHandlingException(metaStream, vectorsStream, indexWriter, indexWriter);
            }
          }
        }
        

        核心方法

        startDocument

        要开始处理一个doc了,创建一个DocData来存储这个doc所有的数据信息。

          @Override
          public void startDocument(int numVectorFields) throws IOException {
            curDoc = addDocData(numVectorFields);
          }  
          private DocData addDocData(int numVectorFields) {
            FieldData last = null;
            // 逆序遍历pendingDocs列表,获取最后一个DocData,需要根据它来计算在下一个DocData在全局buffer中的起始offset
            for (Iterator<DocData> it = pendingDocs.descendingIterator(); it.hasNext(); ) {
              final DocData doc = it.next();
              if (!doc.fields.isEmpty()) {
                last = doc.fields.getLast();
                break;
              }
            }
            final DocData doc;
            if (last == null) {
              doc = new DocData(numVectorFields, 0, 0, 0); // 第一个doc
            } else {
              final int posStart = last.posStart + (last.hasPositions ? last.totalPositions : 0);
              final int offStart = last.offStart + (last.hasOffsets ? last.totalPositions : 0);
              final int payStart = last.payStart + (last.hasPayloads ? last.totalPositions : 0);
              doc = new DocData(numVectorFields, posStart, offStart, payStart);
            }
            pendingDocs.add(doc);
            return doc;
          }
        

        startField

        开始处理当前doc中的一个新的Field,创建FieldData,用来存储field的所有的数据信息。

          public void startField(
              FieldInfo info, int numTerms, boolean positions, boolean offsets, boolean payloads)
              throws IOException {
            curField = curDoc.addField(info.number, numTerms, positions, offsetsphp, payloads);
            lastTerm.length = 0;
          }
        

        startTerm

        计算当前term和前一个term的最长公共前缀。

          public void startTerm(BytesRef term, int freq) throws IOException {
            // 和前一个term的最长公共前缀  
            final int prefix;
            if (lastTerm.length == 0) {
              prefix = 0;
            } else {
              prefix = StringHelper.bytesDifference(lastTerm, term);
            }
            // FieldData新增term  
            curField.addTerm(freq, prefix, term.length - prefix);
            // 存储suffix  
            termSuffixes.writeBytes(term.bytes, term.offset + prefix, term.length - prefix);
            // 更新lastTerm
            if (lastTerm.bytes.length < term.length) {
              lastTerm.bytes = new byte[ArrayUtil.oversize(term.length, 1)];
            }
            lastTerm.offset = 0;
            lastTerm.length = term.length;
            System.arraycopy(term.bytes, term.offset, lastTerm.bytes, 0, term.length);
          }
        

        addPosition

        为当前处理的term新增一个位置相关的信息。

          public void addPosition(int position, int startOffset, int endOffset, BytesRef payload)
              throws IOException {
            assert curField.flags != 0;
            curField.addPosition(
                position, startOffset, endOffset - startOffset, payload == null ? 0 : payload.length);
            if (curField.hasPayloads && payload != null) {
              payloadBytes.writeBytes(payload.bytes, payload.offset, payload.length);
            }
          }
        

        finishField

        结束一个field的处理,就是简单把当前的curFiled清空,等待处理下一个field。

          public void finishField() throws IOException {
            curField = null;
          }
        

        finishDocument

        可以看到,在结束一个doc的处理时,会判断是否满足一个chunk的构建条件,如果满足的话则进行构建。

          public void finishDocument() throws IOException {
            payloadBytes.copyTo(termSuffixes);
            payloadBytes.reset();
            ++numDocs;
            if (triggerFlush()) { // 是否满足一个chunk
              flush(false); // 构建一个chunk
            }
            curDoc = null;
          }
        

        triggerFlush

        判断当前是否满足一个chunk的构建条件,二者满足其一即可:

        • termSuffixes的大小大于等于chunkSize
        • 当前待处理的doc总数大于等于maxDocsPerChunk
          private boolean triggerFlush() {
            return termSuffixes.size() >= chunkSize || pendingDocs.size() >= maxDocsPerChunk;
          }
        

        构建chunk

        flush

        flush触发构建chunk逻辑,里面主要是调度逻辑,按类别构建所需的词向量信息。

          private void flush(boolean force) throws IOException {
            // 当前要构建的chunk中有几个doc  
            final int chunkDocs = pendingDocs.size();
            numChunks++;
            if (force) { // 如果是强制构建chunk,可能是不满足chunk条件的,这种chunk被定义为dirtyChunk
              numDirtyChunks++;
              numDirtyDocs += pendingDocs.size();
            }
            // 构建chunk的索引信息
            indexWriter.writeIndex(chunkDocs, vectorsStream.getFilePointer());
            final int docBase = numDocs - chunkDocs;
            // chunk的起始docID  
            vectorsStream.writeVInt(docBase);
            final int dirtyBit = force ? 1 : 0;
            vectorsStream.writeVInt((chunkDocs &lt;&lt; 1) | dirtyBit);
            // 记录每个doc的field数量
            final int totalFields = flushNumFields(chunkDocs);
            if (totalFields &gt; 0) {
              // 记录当前chunk中所有Field的编号
              final int[] fieldNums = flushFieldNums();
              // 记录所有doc的所有field的编号
              flushFields(totalFields, fieldNums);
              // 记录所有doc的所有field的flag
              flushFlags(totalFields, fieldNums);
              // 记录所有doc的所有field的term数量
              flushNumTerms(totalFields);
              // 记录所有term的长度信息
              flushTermLengths();
              // 记录所有term的频率
              flushTermFreqs();
              // 记录所有term的position信息
              flushPositions();
              // 记录所有term的offset信息
              flushOffsets(fieldNums);
              // 记录所有position的payload信息
              flushPayloadLengths();
              // 记录所有的suffix
              byte[] content = termSuffixes.toArrayCopy();
              compressor.compress(content, 0, content.length, vectorsStream);
            }
            // 重置相关变量,等待处理下一个chunk
            pendingDocs.clear();
            curDoc = null;
            curField = null;
            termSuffixes.reset();
          }
        

        flushNumFields

        记录所有doc的字段总数,分为两种情况:

          private int flushNumFields(int chunkDocs) throws IOException {
            if (chunkDocs == 1) { // 如果chunk中只有一个doc,则就直接写这个doc的字段总数
              final int numFields = pendingDocs.getFirst().numFields;
              vectorsStream.writeVInt(numFields);
              return numFields;
            } else { // 否则,使用PackedInts压缩存储所有doc的字段数信息
              writer.reset(vectorsStream);
              int totalFields = 0;
              for (DocData dd : pendingDocs) {
                writer.add(dd.numFields);
                totalFields += dd.numFields;
              }
              writer.finish();
              return totalFields;
            }
          }
        
        • 如果只有一个doc,则单独记录这个doc的字段数
        • 否则,使用PackedInts压缩存储所有的doc的字段数

        flushFieldNums

          private int[] flushFieldNums() throws IOException {
            // chunk中所有term的编号按序存储  
            SortedSet<Integer> fieldNums = new TreeSet<>();
            for (DocData dd : pendingDocs) {
              for (FieldData fd : dd.fields) {
                fieldNums.add(fd.fieldNum);
              }
            }
            final int numDistinctFields = fieldNums.size();
            final int bitsRequired = PackedInts.bitsRequired(fieldNums.last());
            // b开发者_JS教程itsRequired最大就是32,所以低5位就够了
            final int token = (Math.min(numDistinctFields - 1, 0x07) << 5) | bitsRequired;
            vectorsStream.writeByte((byte) token);
            if (numDistinctFields - 1 >= 0x07) {
              vectorsStream.writeVInt(numDistinctFields - 1 - 0x07);
            }
            final PackedInts.Writer writer =
                PackedInts.getWriterNoHeader(
                    vectorsStream, PackedInts.Format.PACKED, fieldNums.size(), bitsRequired, 1);
            for (Integer fieldNum : fieldNums) {
              writer.add(fieldNum);
            }
            writer.finish();
            // Integer转int
            int[] fns = new int[fieldNums.size()];
            int i = 0;
            for (Integer key : fieldNums) {
              fns[i++] = key;
            }
            return fns;
          }
        

        flushFields

        存储doc中所有的field的编号。

          private void flushFields(int totalFields, int[] fieldNums) throws IOException {
            scratchBuffer.reset();
            // 使用  DirectWriter 压缩存储
            final DirectWriter writer =
                DirectWriter.getInstance(
                    scratchBuffer, totalFields, DirectWriter.bitsRequired(fieldNums.length - 1));
            for (DocData dd : pendingDocs) {
              for (FieldData fd : dd.fields) {
                final int fieldNumIndex = Arrays.binarySearch(fieldNums, fd.fieldNum);
                assert fieldNumIndex >= 0;
                writer.add(fieldNumIndex);
              }
            }
            writer.finish();
            vectorsStream.writeVLong(scratchBuffer.size());
            scratchBuffer.copyTo(vectorsStream);
          }
        

        flushFlags

        存储doc中所有field的flag,分为两种情况:

          private void flushFlags(int totalFields, int[] fieldNums) throws IOException {
            // 所有doc中相同的field是否都是一样的flag
            boolean nonChangingFlags = true;
            // 如果所有相同的field的flag都一样,则最后只存储这个数组  
            int[] fieldFlags = new int[fieldNums.length];
            Arrays.fill(fieldFlags, -1);
            outer:
            for (DocData dd : pendingDocs) { // 遍历所有的doc
              for (FieldData fd : dd.fields) { // 遍历所有的field
                final int fieldNumOff = Arrays.binarySearch(fieldNums, fd.fieldNum);
                assert fieldNumOff >= 0;
                if (fieldFlags[fieldNumOff] == -1) {
                  fieldFlags[fieldNumOff] = fd.flags;
                } else if (fieldFlags[fieldNumOff] != fd.flags) { // 有一个field不一样
                  nonChangingFlags = false;
                  break outer;
                }
              }
            }
            if (nonChangingFlags) { // 如果所有doc相同的field的flag都一样,
              // 写0标记这种情况
              vectorsStream.writeVInt(0);
              scratchBuffer.reset();
              final DirectWriter writer =
                  DirectWriter.getInstance(scratchBuffer, fieldFlags.length, FLAGS_BITS);
              for (int flags : fieldFlags) { // 每个field只写一个flag
                assert flags >= 0;
                writer.add(flags);
              }
              writer.finish();
              vectorsStream.writeVInt(Math.toIntExact(scratchBuffer.size()));
              scratchBuffer.copyTo(vectorsStream);
            } else { // 需要记录所有doc中的所有field的flag
              // 写1标记这种情况
              vectorsStream.writeVInt(1);
              scratchBuffer.reset();
              final DirectWriter writer = DirectWriter.getInstance(scratchBuffer, totalFields, FLAGS_BITS);
              for (DocData dd : pendingDocs) { // 遍历doc
                for (FieldData fd : dd.fields) { // 遍历field
                  writer.add(fd.flags); // 记录field的flag
                }
              }
              writer.finish();
              vectorsStream.writeVInt(Math.toIntExact(scratchBuffer.size()));
              scratchBuffer.copyTo(vectorsStream);
            }
          }
        
        • 如果所有doc中相关的field的flag都一样,则每个field的flag单独存储一份就可以
        • 否则,需要存储所有doc中所有field的flag

        flushNumTerms

        存储所有field的term数量,会先统计最大的term数量,用来获取最大的term数据值需要几个bit存储。

          private void flushNumTerms(int totalFields) throws IOException {
            int maxNumTerms = 0;
            // 获取最大的term数量的值  
            for (DocData dd : pendingDocs) { 
              for (FieldData fd : dd.fields) {
                maxNumTerms |= fd.numTerms;
              }
            }
            final int bitsRequired = DirectWriter.bitsRequired(maxNumTerms);
            vectorsStream.writeVInt(bitsRequired);
            scratchBuffer.reset();
            // 使用DirectWriter压缩存储  
            final DirectWriter writer = DirectWriter.getInstance(scratchBuffer, totalFields, bitsRequired);
            for (DocData dd : pendingDocs) {
              for (FieldData fd : dd.fields) {
                writer.add(fd.numTerms);
              }
            }
            writer.finish();
            vectorsStream.writeVInt(Math.toIntExact(scratchBuffer.size()));
            scratchBuffer.copyTo(vectorsStream);
          }
        

        flushTermLengths

        分别存储term的prefixLength和suffixLength。

          private void flushTermLengths() throws IOException {
            // 存储prefixLength  
            writer.reset(vectorsStream);
            for (DocData dd : pendingDocs) {
              for (FieldData fd : dd.fields) {
                for (int i = 0; i < fd.numTerms; ++i) {
                  writer.add(fd.prefixLengths[i]);
                }
              }
            }
            writer.finish();
            // 存储suffixLength  
            writer.reset(vectorsStream);
            for (DocData dd : pendingDocs) {
              for (FieldData fd : dd.fields) {
                for (int i = 0; i < fd.numTerms; ++i) {
                  writer.add(fd.suffixLengths[i]);
                }
              }
            }
            writer.finish();
          }
        

        flushTermFreqs

        存储term的频率,这里有个小小的优化,为了提高压缩率。

          private void flushTermFreqs() throws IOException {
            writer.reset(vectorsStream);
            for (DocData dd : pendingDocs) {
              for (FieldData fd : dd.fields) {
                for (int i = 0; i < fd.numTerms; ++i) {
                  // 已经确定了fandroidreq肯定是大于等于1,减1是为了提高writer的压缩率,读取的时候加1就行了。
                  writer.add(fd.freqs[i] - 1);
                }
              }
            }
            writer.finish();
          }
        

        flushPositions

        差值存储所有的position。

          private void flushPositions() throws IOException {
            writer.reset(vectorsStream);
            for (DocData dd : pendingDocs) {
              for (FieldData fd : dd.fields) {
                if (fd.hasPositions) {
                  int pos = 0;
                  for (int i = 0; i < fd.numTerms; ++i) {
                    int previousPosition = 0;
                    for (int j = 0; j < fd.freqs[i]; ++j) {
                      final int position = positionsBuf[fd.posStart + pos++];
                      writer.add(position - previousPosition);
                      previousPosition = position;
                    }
                  }
                  assert pos == fd.totalPositions;
                }
              }
            }
            writer.finish();
          }
        

        flushOffsets

        offset的存储做了一个优化设计,原因是term出现的不同的offset跨度可能会比较大,如果把原始的offset用PackedInts进行存储,可能压缩率不会很高。因此,在正式存储offset之前,先计算平均的term长度,根据term出现的前后两个offset的position,可以估计两个position的距离,用真实的前后两个offset的距离减去这个估计的距离,就能使得offset的差值向0趋近,可以提高PackedInts的压缩率。

          private void flushOffsets(int[] fieldNums) throws IOException {
            // 至少一个字段开启了offset  
            boolean hasOffsets = false;
            // term在所有字段中出现的最后一个postition之和  
            long[] sumPos = new long[fieldNums.length];
            // term在所有字段中出现的最后一个startOffset之和  
            long[] sumOffsets = new long[fieldNums.length];
            for (DocData dd : pendingDocs) { // 遍历所有的doc
              for (FieldData fd : dd.fields) { // 遍历doc中的所有field
                hasOffsets |= fd.hasOffsets;
                if (fd.hasOffsets && fd.hasPositions) { // 如果字段开启了offset和position
                  // 查找在term数组中的下标  
                  final int fieldNumOff = Arrays.binarySearch(fieldNums, fd.fieldNum);
                  int pos = 0;
                  for (int i = 0; i < fd.numTerms; ++i) {
                    sumPos[fieldNumOff] += positionsBuf[fd.posStart + fd.freqs[i] - 1 + pos];
                    sumOffsets[fieldNumOff] += startOffsetsBuf[fd.offStart + fd.freqs[i] - 1 + pos];
                    pos += fd.freqs[i];
                  }
                  assert pos == fd.totalPositions;
                }
              }
            }
            if (!hasOffsets) {
              return;
            }
            final float[] charsPerTerm = new float[fieldNums.length];
            // 用  sumOffsets[i] / sumPos[i] 估计第i个term的长度
            for (int i = 0; i < fieldNums.length; ++i) {
              charsPerTerm[i] =
                  (sumPos[i] <= 0 || sumOffsets[i] <= 0) ? 0 : (float) ((double) sumOffsets[i] / sumPos[i]);
            }
            // tvd中存储charsPerTerm
            for (int i = 0; i < fieldNums.length; ++i) {
              vectorsStream.writeInt(Float.floatToRawIntBits(charsPerTerm[i]));
            }
            writer.reset(vectorsStream);
            for (DocData dd : pendingDocs) { // 遍历所有的doc
              for (FieldData fd : dd.fields) { // 遍历doc中所有的field
                if ((fd.flags & OFFSETS) != 0) { // 如果开启了offset
                  final int fieldNumOff = Arrays.binarySearch(fieldNums, fd.fieldNum);
                  final float cpt = charsPerTerm[fieldNumOff];
                  int pos = 0;
                  for (int i = 0; i < fd.numTerms; ++i) { // 遍历field中所有的term
                    int previousPos = 0; // 差值使用
                    int previousOff = 0; // 差值使用
                    for (int j = 0; j < fd.freqs[i]; ++j) { // 遍历term出现的所有位置
                      final int position = fd.hasPositions ? positionsBuf[fd.posStart + pos] : 0;
                      final int startOffset = startOffsetsBuf[fd.offStart + pos];
                      // (int) (cpt * (position - previousPos)):当前potition和前一个position之间的长度
                      // startOffset - previousOff再减去(int) (cpt * (position - previousPos))就把值降到最小
                      writer.add(startOffset - previousOff - (int) (cpt * (position - previousPos)));
                      previousPos = position;
                      previousOff = startOffset;
                      ++pos;
                    }
                  }
                }
              }
            }
            writer.finish();
            // lengths
            writer.reset(vectorsStream);
            for (DocData dd : pendingDocs) { // 遍历所有的doc
              for (FieldData fd : dd.fields) { // 遍历所有的Field
                if ((fd.flags & OFFSETS) != 0) { // 如果开启了offset
                  int pos = 0;
                  for (int i = 0; i < fd.numTerms; ++i) {
                    for (int j = 0; j < fd.freqs[i]; ++j) {
                      writer.add(
                          // 减去前缀长度和后缀长度也是为了把值变小,减少存储空间
                          lengthsBuf[fd.offStart + pos++] - fd.prefixLengths[i] - fd.suffixLengths[i]);
                    }
                  }
                  assert pos == fd.totalPositions;
                }
              }
            }
            writer.finish();
          }
        

        flushPayloadLengths

        存储所有的payload的长度信息。

          private void flushPayloadLengths() throws IOException {
            writer.reset(vectorsStream);
            for (DocData dd : pendingDocs) {
              for (FieldData fd : dd.fields) {
                if (fd.hasPayloads) {
                  for (int i = 0; i &lt; fd.totalPositions; ++i) {
                    writer.add(payloadLengthsBuf[fd.payStart + i]);
                  }
                }
              }
            }
            writer.finish();
          }
        

        finish

        结束词向量索引文件的构建,把待处理doc列表中剩下的doc生成一个chunk。

          public void finish(int numDocs) throws IOException {
            if (!pendingDocs.isEmpty()) { // 如果还有待处理的doc,则强制生成一个chunk
              flush(true);
            }
            if (numDocs != this.numDocs) {
              throw new RuntimeException(
                  "Wrote " + this.numDocs + " docs, finish called with numDocs=" + numDocs);
            }
            // 生成tvx索引文件  
            indexWriter.finish(numDocs, vectorsStream.getFilePointer(), metaStream);
            metaStream.writeVLong(numChunks);
            metaStream.writeVLong(numDirtyChunks);
            metaStream.writeVLong(numDirtyDocs);
            CodecUtil.writeFooter(metaStream);
            CodecUtil.writeFooter(vectorsStream);
          }
        

        TermVectorsConsumerPerField

        TermVectorsConsumerPerField是TermsHashPerField的子类,TermsHashPerField在之前介绍倒排的时候已经非常详细地介绍过了。

        在本文中,我们重点介绍不一样的地方。在介绍倒排的时候使用的是FreqProxTermsWriterPerField,它存储了所有的倒排数据,在所有的文档都处理完了之后才进行序列化和持久化,TermVectorsConsumerPerField和它最大的区别是每处理完一个doc,就进行序列化然后重置等待处理下一个doc。

        在TermVectorsConsumerPerField的源码中,如果已经看明白之前倒排的逻辑,则大部分地方理解起来都比较容易,这里我们只看一个文档处理完之后进行序列化的逻辑,实际上在TermVectorsConsumerPerField中只负责调度Lucene90CompressingTermVectorsWriter进行操作:

          void finishDocument() throws IOException {
            // 如果没有开启词向量构建  
            if (doVectors == false) {
              return;
            }
            doVectors = false;
            // 当前field的term总数
            final int numPostings = getNumTerms();
            // 用来存储当前序列化的term
            final BytesRef flushTerm = termsWriter.flushTerm;
            TermVectorsPostingsArray postings = termVectorsPostingsArray;
            // 序列化和持久化的核心类,实际上使用的实现类:Lucene90CompressingTermVectorsWriter 
            final TermVectorsWriter tv = termsWriter.writer;
            // 对term进行排序
            sortTerms();
            // 获取排序后的termID列表
            final int[] termIDs = getSortedTermIDs();
            // 开始处理一个Field
            tv.startField(fieldInfo, numPostings, doVectorPositions, doVectorOffsets, hasPayloads);
            // 用来从bytePool中读取position信息
            final ByteSliceReader posReader = doVectorPositions ? termsWriter.vectorSliceReaderPos : null;
            // 用来从bytePool中读取offset信息  
            final ByteSliceReader offReader = doVectorOffsets ? termsWriter.vectorSliceReaderOff : null;
            // 遍历所有的term
            for (int j = 0; j < numPostings; j++) {
              final int termID = termIDs[j];
              final int freq = postings.freqs[termID];
              // 当前处理的term存入flushTerm
              termBytePool.setBytesRef(flushTerm, postings.textStarts[termID]);
              // 准备序列化term的词向量信息  
              tv.startTerm(flushTerm, freq);
              if (doVectorPositions || doVectorOffsets) {
                if (posReader != null) {
                  initReader(posReader, termID, 0);
                }
                if (offReader != null) {
                  initReader(offReader, termID, 1);
                }
                // 序列化所有的position和offset信息
                tv.addProx(freq, posReader, offReader);
              }
              // 结束term的处理  
              tv.finishTerm();
            }
            // 结束Field的处理  
            tv.finishField();
            reset();
            fieldInfo.setStoreTermVectors();
          }
        

        索引文件格式

        tvm

        词向量索引文件的元信息,用来读取使用。

        Lucene词向量索引文件构建源码解析

        字段详解

        Header

        文件头部信息,主要是包括:

        • 文件头魔数(同一lucene版本所有文件相同)
        • 该文件使用的codec名称:Lucene90TermVectorsIndexMeta
        • codec版本
        • segment id(也是Segment_N文件中的N)
        • segment后缀名(一般为空)

        PackedItsVersion

        在词向量的索引文件中有很多数据是使用PackedIts压缩存储,该字段记录PackedInts的版本。、

        ChunkSize

        用来判断是否满足一个chunk的一种条件,如果chunk的大小超过了ChunkSize的限制,则可以构建一个chunk

        NumChunks

        chunk总数

        NumDirtyChunks

        dirtyChunk总数

        NumDirtyDocs

        dirtyChunk中的doc总数

        NumDocs

        doc总数

        BlockShift

        DirectMonotonicWriter需要的参数,DirectMonotonicWriter压缩存储会生成多个block,BlockShift决定了block的大小。

        TotalChunks + 1

        chunk总数 + 1,在生成tvx索引文件中ChunkStartDocIDs和ChunkTVDOffsets两个字段时,使用DirectMonotonicWriter写入的值的总数。

        tvxDocStartFP

        tvx索引文件中ChunkStartDocIDs的起始位置

        DocBlockMeta

        tvx索引文件中ChunkStartDocIDs使用DirectMonotonicWriter编码存储,会生成多个block,这些block的元信息。

        tvxOffsetStartFP

        tvx中ChunkTVDOffsets的起始位置

        OffsetBlockMeta

        tvx索引文件中ChunkTVDOffsets使用DirectMonotonicWriter编码存储,会生成多个block,这些block的元信息。

        SPEndPoint

        tvx文件的结束位置,后面是tvx的footer信息。

        MaxPointer

        tvd文件的结束位置,后面tvd的footer信息。

        Footer

        文件尾,主要包括

        • 文件尾魔数(同一个lucene版本所有文件一样)
        • 0
        • 校验码

        tvd 字段详解

        tvd索引文件主要是存储倒排信息中freq,position,offset,payload。

        Lucene词向量索引文件构建源码解析

        Header

        文件头部信息,主要是包括:

        • 文件头魔数(同一lucene版本所有文件相同)
        • 该文件使用的codec名称:Lucene90TermVectorsData
        • codec版本
        • segment id(也是Segment_N文件中的N)
        • segment后缀名(一般为空)

        chunk

        在词向量的构建过程中,

        • DocBase:Chunk中Doc的起始编号,Chunk中所有doc的真实编号需要加上这个DocBase
        • ChunkDocsCode:是ChunkDocs和isDirty的int组合体
          • ChunkDocs:chunk中的doc总数
          • isDirty:chunk中是否存在dirtyChunk
        • NumField:chunk中文档的字段个数。如果chunk中只有一个doc,则存储的就是这个doc的字段个数。否则使用packedInts压缩存储所有的doc的字段个数信息。
        • AllUniqueFieldNums:按序存储chunk中所有去重后的field的编号,用PackedInts压缩存储。
        • FieldNums:使用DirectWriter压缩存储所有doc的字段编号在AllUniqueFieldNums列表中的下标。
        • FieldFlagsCode:int类型的IsChangeFlag和DirectWriter压缩存储的FiledFlags
          • IsChangeFlag:是否有字段在不同的doc中的flag是不一样的。0表示所有的doc中的相同Field的flag都一样,1则不是。
          • FieldFlags:如果IsChangeFlag==0,则存储的是AllUniqueFieldNums中每个字段的flag,否则存储的就是所有doc中所有字段的flag。
        • FieldNumTerms:使用DirectWriter压缩存储所有的doc中的所有field的term的总数
        • PrefixLengths:使用PackedInts存储所有term和同字段中前一个term的最长公共前缀长度
        • SuffixLengths:使用PackedInts存储所有term和同字段中前一个term的扣除最长公共前缀剩下的后缀的长度
        • TermFreqs:使用PackedInts存储所有doc的所有字段中term出现的频率
        • Positions:使用PackedInts存储所有doc中所有Field中term出现的差值position
        • StartOffsets:使用PackedInts存储所有doc中所有Field中term出现的StartOffset,具体做了一些处理(看前面的源码分析),占用空间会更小。
        • Lengths:使用PackedInts存储所有doc中所有Field中term出现的EndOffset - StartOffset,具体做了一些处理(看前面的源码分析),占用空间会更小。
        • PayloadLengths:使用PackedInts存储所有的doc中所有字段中的term的payload长度
        • TermSuffixes:使用LZ4压缩存储所有term的suffix

        Footer

        文件尾,主要包括

        • 文件尾魔数(同一个lucene版本所有文件一样)
        • 0
        • 校验码

        tvx 字段详解

        tvx索引文件主要存储的是tvd索引文件中的一些索引信息,tvd中每个chunk的起始docID以及存储的起始位置。

        Lucene词向量索引文件构建源码解析

        Header

        文件头部信息,主要是包括:

        • 文件头魔数(同一lucene版本所有文件相同)
        • 该文件使用的codec名称:Lucene90TermVectorsIndexIdx
        • codec版本
        • segment id(也是Segment_N文件中的N)
        • segment后缀名(一般为空)

        ChunkStartDocIDs

        所有chunk的起始docID,使用DirectMonotonicWriter编码存储,会生成多个block。

        ChunkTVDOffsetsBlock

        所有chunk在tvd索引文件中的起始位置,使用DirectMonotonicWriter编码存储,会生成多个block。

        Footer

        文件尾,主要包括

        • 文件尾魔数(同一个lucene版本所有文件一样)
        • 0
        • 校验码

        以上就是Lucene词向量索引文件构建源码解析的详细内容,更多关于Lucene词向量索引文件构建的资料请关注我们其它相关文章!

        0

        精彩评论

        暂无评论...
        验证码 换一张
        取 消

        关注公众号