项目后端简介

项目后端共由四个文件组成,其中VideoLoader负责对上传的视频进行关键帧提取,CustomClipText负责将上传的图片转换为向量数据,CustomClipImage负责将提取的关键帧转换为向量数据,SimpleIndexer负责向量数据的检索

:caption: BackendVideoLoader
CustomClipText
CustomClipImage
SimpleIndexer

VideoLoader

基础实现

YAML配置

jtype: VideoLoader
metas:py_modules:- video_loader.py

导入第三方库

import io
import os
import random
import re
import string
import tempfile
import urllib.request
import urllib.parse
from copy import deepcopy
from typing import Dict, Iterable, Optional
from pathlib import Pathimport ffmpeg
import librosa
import numpy as np
import webvtt
from jina import Executor, requests
from docarray import Document, DocumentArray
from jina.logging.logger import JinaLogger
from PIL import Image
import math
import

默认设置

=======

介绍

DEFAULT_FPS = 1.0
DEFAULT_AUDIO_BIT_RATE = 160000
DEFAULT_AUDIO_CHANNELS = 2
DEFAULT_AUDIO_SAMPLING_RATE = 44100
DEFAULT_SUBTITLE_MAP = '0:s:0'

参数解释:

  • DEFAULT_FPS:默认视频每秒传输帧数(此处视频截取精度为1s)
  • DEFAULT_AUDIO_BIT_RATE:默认音频比特率(码率)
  • DEFAULT_AUDIO_CHANNELS:默认音频通道数
  • DEFAULT_AUDIO_SAMPLING_RATE:默认音频采样率
  • DEFAULT_SUBTITLE_MAP:默认第一路字幕文件输出流,详情参见FFmpeg使用基础

类初始化

class VideoLoader(Executor):def __init__(self,modality_list: Iterable[str] = ('image', 'audio', 'text'),ffmpeg_video_args: Optional[Dict] = None,ffmpeg_audio_args: Optional[Dict] = None,ffmpeg_subtitle_args: Optional[Dict] = None,librosa_load_args: Optional[Dict] = None,copy_uri: bool = True,**kwargs,):super().__init__(**kwargs)

参数解释:

  • modality_list:需要提取的不同模态数据类型
  • ffmpeg_video_args:视频抽帧所需确定的参数
  • ffmpeg_audio_args:音频抽取所需确定的参数
  • ffmpeg_subtitle_args:字幕抽取所需确定的参数
  • librosa_load_args:将音频数据转换成张量所需确定的参数
  • copy_uri:是否存储视频对应uri
        self._modality = modality_listself._copy_uri = copy_uri
        self._ffmpeg_video_args = ffmpeg_video_args or {}self._ffmpeg_video_args.setdefault('format', 'rawvideo')self._ffmpeg_video_args.setdefault('pix_fmt', 'rgb24')self._ffmpeg_video_args.setdefault('frame_pts', True)self._ffmpeg_video_args.setdefault('vsync', 0)self._ffmpeg_video_args.setdefault('vf', f'fps={DEFAULT_FPS}')fps = re.findall('.*fps=(\d+(?:\.\d+)?).*', self._ffmpeg_video_args['vf'])if len(fps) > 0:self._frame_fps = float(fps[0])

视频相关参数设定

        self._ffmpeg_audio_args = ffmpeg_audio_args or {}self._ffmpeg_audio_args.setdefault('format', 'wav')self._ffmpeg_audio_args.setdefault('ab', DEFAULT_AUDIO_BIT_RATE)self._ffmpeg_audio_args.setdefault('ac', DEFAULT_AUDIO_CHANNELS)self._ffmpeg_audio_args.setdefault('ar', DEFAULT_AUDIO_SAMPLING_RATE)self._ffmpeg_subtitle_args = ffmpeg_subtitle_args or {}self._ffmpeg_subtitle_args.setdefault('map', DEFAULT_SUBTITLE_MAP)

音频相关参数设定

        self._librosa_load_args = librosa_load_args or {}self._librosa_load_args.setdefault('sr', self._ffmpeg_audio_args['ar'])self._librosa_load_args.setdefault('mono', self._ffmpeg_audio_args['ac'] > 1)

字幕相关参数设定

        self.logger = JinaLogger(getattr(self.metas, 'name', self.__class__.__name__)).logger

导入日志信息类

视频抽取

@requests方法可以参考官网说明

参数解释:

  • docs:包含了Documents的待编码的DocumentArray
  • parameters:字典类型,包含了用于控制编码的参数(keys包括traversal_pathsbatch_size)
    =======
# In Jina 添加到flow中
from jina import Flow
from docarray import Document, DocumentArray```python@requests(on='/extract')def extract(self, docs: DocumentArray, parameters: Dict, **kwargs):t1 = time.time()print('video_loader extract', t1)for doc in docs:print(f'video chunks: {len(doc.chunks)}')for doc in docs:self.logger.info(f'received {doc.id}')if doc.uri == '':self.logger.error(f'No uri passed for the Document: {doc.id}')continuewith tempfile.TemporaryDirectory() as tmpdir:source_fn = (self._save_uri_to_tmp_file(doc.uri, tmpdir)if self._is_datauri(doc.uri)else doc.uri)

读入视频文件

                if 'image' in self._modality:ffmpeg_video_args = deepcopy(self._ffmpeg_video_args)ffmpeg_video_args.update(parameters.get('ffmpeg_video_args', {}))frame_tensors = self._convert_video_uri_to_frames(source_fn, doc.uri, ffmpeg_video_args)for idx, frame_tensor in enumerate(frame_tensors):self.logger.debug(f'frame:{idx}')chunk = Document(modality='image')max_size = 240img = Image.fromarray(frame_tensor)if img.size[0] > img.size[1]:width = max_sizeheight = math.ceil(max_size / img.size[0] * img.size[1])else:height = max_sizewidth = math.ceil(max_size / img.size[1] * img.size[0])img = img.resize((width, height))chunk.tensor = np.asarray(img).astype('uint8')print(chunk.tensor.shape)# chunk.tensor = np.array(frame_tensor).astype('uint8')chunk.location = (np.uint32(idx),)chunk.tags['timestamp'] = idx / self._frame_fpsif self._copy_uri:chunk.tags['video_uri'] = doc.uridoc.chunks.append(chunk)t2 = time.time()print(t2 - t1, t2)

将图片转换为张量

        with torch.inference_mode():for batch_docs in document_batches_generator:print('in for')for d in batch_docs:print('in clip image d.uri', d.uri, len(d.chunks))tensors_batch = []for c in d.chunks:if (c.modality == 'image'):image_embedding = self.model.encode_image(self.preprocessor(Image.fromarray(c.tensor)).unsqueeze(0).to(self.device))tensors_batch.append(np.array(image_embedding).astype('float32'))embedding = tensors_batchd.embedding = embeddingt2 = time.time()print('clip_image encode end', t2 - t1, t2)

图片参数获取

    def _convert_video_uri_to_frames(self, source_fn, uri, ffmpeg_args):video_frames = []try:video = ffmpeg.probe(source_fn)['streams'][0]w, h = ffmpeg_args.get('s', f'{video["width"]}x{video["height"]}').split('x')w = int(w)h = int(h)out, _ = (ffmpeg.input(source_fn).output('pipe:', **ffmpeg_args).run(capture_stdout=True, quiet=True))video_frames = np.frombuffer(out, np.uint8) #.reshape([-1, h, w, 3])video_frames = video_frames.reshape([-1, h, w, 3])except ffmpeg.Error as e:self.logger.error(f'Frame extraction failed,{uri},{e.stderr}')return video_frames

获取视频中图片的width和height

音频参数获取

    def _convert_video_uri_to_audio(self, source_fn, uri, ffmpeg_args, librosa_args):data = Nonesample_rate = Nonetry:out, _ = (ffmpeg.input(source_fn).output('pipe:', **ffmpeg_args).run(capture_stdout=True, quiet=True))data, sample_rate = librosa.load(io.BytesIO(out), **librosa_args)except ffmpeg.Error as e:self.logger.error(f'Audio extraction failed with ffmpeg, uri:{uri},{e.stderr}')except librosa.LibrosaError as e:self.logger.error(f'Array conversion failed with librosa, uri:{uri},{e}')finally:return data, sample_rate

获取视频中音频采样率

字幕数据获取

    def _convert_video_uri_to_subtitle(self, source_fn, ffmpeg_args, tmp_dir):subtitle_fn = str(os.path.join(tmp_dir, 'subs.srt'))subtitles = []print(ffmpeg_args)try:out, _ = (ffmpeg.input(source_fn).output(subtitle_fn, **ffmpeg_args).run(capture_stdout=True, quiet=True))subtitles = self._process_subtitles(Path(subtitle_fn))except ffmpeg.Error as e:self.logger.error(f'Subtitle extraction failed with ffmpeg,{e.stderr}')finally:return subtitles

获取视频中处理过的字幕文本

    def _process_subtitles(self, srt_path: Path, vtt_path: Path = None, tmp_srt_path: Path = None):beg = Noneis_last_cap_complete = Truesubtitles = []prev_parts = []vtt_fn = self._convert_srt_to_vtt(srt_path, vtt_path, tmp_srt_path)for caption in webvtt.read(vtt_fn):cur_parts = [tfor t in filter(lambda x: len(x.strip()) > 0, caption.text.split('\n'))]filtered_text = ' '.join(cur_parts)if len(cur_parts) == 1:if cur_parts[0] in prev_parts:continueif len(cur_parts) > 1:if cur_parts[0] in prev_parts and is_last_cap_complete:filtered_text = ' '.join(cur_parts[1:])is_cur_complete = Trueif is_last_cap_complete:beg = caption.start_in_secondsif caption.text.startswith(' \n') or caption.text.endswith('\n '):is_cur_complete = Falseif is_cur_complete:if filtered_text:subtitles.append((beg, caption.end_in_seconds, filtered_text))is_last_cap_complete = is_cur_completeprev_parts = cur_partsreturn subtitles

对字幕按照换行情况进行切分

    def _remove_carriage_return(self, input_path, output_path=None):result = []with open(input_path, 'rb') as f:for l in f:if l == b'\r\n':continuenew_l = l.decode('utf8').replace('\r\n', '\n')new_l = new_l.rstrip('\n')result.append(new_l)if output_path is None:output_fn = f'{input_path.stem}_no_cr{input_path.suffix}'output_path = input_path.parent / output_fnwith open(output_path, 'w') as f:f.write('\n'.join(result))return output_path

除去所有回车

    def _convert_srt_to_vtt(self, srt_path: Path, vtt_path: Path = None, tmp_srt_path: Path = None):if vtt_path is None:vtt_path = srt_path.parent / f'{srt_path.stem}.vtt'try:result = webvtt.from_srt(srt_path)except webvtt.errors.MalformedCaptionError as e:self.logger.warning('remove carriage returns from the .srt file')srt_path = self._remove_carriage_return(srt_path, tmp_srt_path)result = webvtt.from_srt(srt_path)result.save(output=vtt_path)return vtt_path

将srt字幕文件转换为webvtt字幕文件

其他

    def _save_uri_to_tmp_file(self, uri, tmpdir):req = urllib.request.Request(uri, headers={'User-Agent': 'Mozilla/5.0'})tmp_fn = os.path.join(tmpdir,''.join([random.choice(string.ascii_lowercase) for i in range(10)])+ '.mp4',)with urllib.request.urlopen(req) as fp:buffer = fp.read()binary_fn = io.BytesIO(buffer)with open(tmp_fn, 'wb') as f:f.write(binary_fn.read())return tmp_fn

文件写入

    def _is_datauri(self, uri):scheme = urllib.parse.urlparse(uri).schemereturn scheme in {'data'}

=======

示例

判别是否为已有uri

进阶延展

Executor调用

多数用户可以想到的功能都已经被上传到Jina Hub上,VideoLoader的主体也可以在hub中进行访问,可以直接调用封装好的Executor,实现自己的功能模块。

flow配置

executors:- name : loaderuses: 'jinahub://VideoLoader/latest'

CustomClipText

基础实现

YAML配置

jtype: CLIPTextEncoder
metas:py_modules:- clip_text.py

导入第三方库

from typing import Dict, Optionalimport torch
from docarray import DocumentArray
from jina import Executor, requests
import clip
import time

类初始化

class CLIPTextEncoder(Executor):def __init__(self,pretrained_model_name_or_path: str = 'ViT-B/32',base_tokenizer_model: Optional[str] = None,max_length: int = 77,device: str = 'cpu',traversal_paths: str = '@r',batch_size: int = 32,*args,**kwargs,):super().__init__(*args, **kwargs)

参数解释:

  • pretrained_model_name_or_path:可以是 Hugging Face 中的线上 repository,亦或是本地的 directory,此处预训练的模型使用 Vision Transformer-Base/32, input batch size 为 32*32
  • base_tokenizer_model:基础的分词器,如果为空值的话则默认使用 pretrained_model_name_or_path
  • max_length:分词器能接受的最大长度,所有CLIP模型都为77
  • device:预处理设备
  • traversal_paths:遍历路径
  • batch_size:批大小
        self.traversal_paths = traversal_pathsself.batch_size = batch_sizeself.pretrained_model_name_or_path = pretrained_model_name_or_pathself.base_tokenizer_model = (base_tokenizer_model or pretrained_model_name_or_path)

此处即上述base_tokenizer_model所取的或逻辑

        self.max_length = max_lengthself.device = devicemodel, preprocessor = clip.load(self.pretrained_model_name_or_path, device=device)self.preprocessor = preprocessorself.model = model

文本编码

    @requestsdef encode(self, docs: DocumentArray, parameters: Dict, **kwargs):print('clip_text encode')for docs_batch in DocumentArray(filter(lambda x: bool(x.text),docs[parameters.get('traversal_paths', self.traversal_paths)],)).batch(batch_size=parameters.get('batch_size', self.batch_size)) :text_batch = docs_batch.texts

参数解释:

  • docs:包含了 Documents 的待编码的 DocumentArray
  • parameters:字典类型,包含了用于控制编码的参数(keys 包括 traversal_pathsbatch_size)

对数据类型进行过滤,对所有文本进行批处理

            t1 = time.time()with torch.inference_mode():input_tokens = [self.model.encode_text(clip.tokenize([t, "unknown"]).to(self.device)) for t in text_batch]embeddings = input_tokensfor doc, embedding in zip(docs_batch, embeddings):doc.embedding = embeddingt2 = time.time()print("encode text cost:", t2 - t1)print(t1)print(t2)

对文本数据进行编码,以 DocumentArray 形式存储,便于后续传值

CustomClipImage

基础实现

YAML配置

jtype: CLIPImageEncoder
metas:py_modules:- clip_image.py

导入第三方库

from typing import Optional, Tuple, Dictimport torch
from docarray import DocumentArray
from jina import Executor, requests
from jina.logging.logger import JinaLogger
from transformers import CLIPFeatureExtractor, CLIPModel
import numpy as np
import clip
from PIL import Image
import

类初始化

class CLIPImageEncoder(Executor):def __init__(self,pretrained_model_name_or_path: str = 'ViT-B/32',device: str = 'cpu',batch_size: int = 32,traversal_paths: str = '@r',*args,**kwargs,):super().__init__(*args, **kwargs)

参数解释:

  • pretrained_model_name_or_path:可以是Hugging Face中的线上repository,亦或是本地的directory,此处预训练的模型使用Vision Transformer-Base/32, input batch size为32*32
  • device:预处理设备
  • batch_size:批大小
  • traversal_paths:遍历路径
        self.batch_size = batch_sizeself.traversal_paths = traversal_pathsself.pretrained_model_name_or_path = pretrained_model_name_or_pathself.logger = JinaLogger(self.__class__.__name__)

导入日志信息类

        self.device = devicemodel, preprocessor = clip.load(self.pretrained_model_name_or_path, device=device)self.preprocessor = preprocessorself.model = model

图像编码

@requests方法可以参考官网说明

    @requestsdef encode(self, docs: DocumentArray, parameters: dict, **kwargs):t1 = time.time()print('clip_image encode', t1)document_batches_generator =  DocumentArray(filter(lambda x: x is not None,docs[parameters.get('traversal_paths', self.traversal_paths)],)).batch(batch_size=parameters.get('batch_size', self.batch_size))

参数解释:

  • docs:包含了Documents的待编码的DocumentArray
  • parameters:字典类型,包含了用于控制编码的参数(keys包括traversal_pathsbatch_size)

对数据类型进行过滤,对所有图像进行批处理

        with torch.inference_mode():for batch_docs in document_batches_generator:print('in for')for d in batch_docs:print('in clip image d.uri', d.uri, len(d.chunks))tensors_batch = []for c in d.chunks:if (c.modality == 'image'):image_embedding = self.model.encode_image(self.preprocessor(Image.fromarray(c.tensor)).unsqueeze(0).to(self.device))tensors_batch.append(np.array(image_embedding).astype('float32'))embedding = tensors_batchd.embedding = embeddingt2 = time.time()print('clip_image encode end', t2 - t1, t2)

通过URI访问图像数据,对其进行编码,以DocumentArray形式存储,便于后续传值

进阶延展

Executor调用

多数用户可以想到的功能都已经被上传到Jina Hub上,CustomClipImage的主体也可以在hub中进行访问,可以直接调用封装好的Executor,实现自己的功能模块,同时可以通过latest-gpu版本利用显存资源

flow配置

executors:- name : encoderuses: 'jinahub://CLIPImageEncoder/latest'timeout_ready : -1uses_with:name: openai/clip-vit-base-patch32

SimpleIndexer

逻辑结构图

基础实现

注:Python教程按照代码顺序撰写,但是由于存在较为复杂的互相调用,建议结合逻辑结构图理解

YAML配置

jtype: SimpleIndexer
with:match_args: limit: $TOP_Ktraversal_rdarray: 'c'
metas:py_modules:- executor.pyworkspace: workspace/

limit即片段获取章节中的maxCount

导入第三方库

import inspect
import os
from typing import Dict, Optionalfrom jina import DocumentArray, Executor, requests
from jina.logging.logger import JinaLogger
import clip
from torch import Tensor
import torch
import time

inspect库用于获取对象信息,帮助校验类的内容。

类初始化

基本概念理解:

  • Executor是Jina处理Document的载体
  • Flow是Jina使得Executor提效和缩放(可以使用于大规模数据)的工具

引入Executor的好处:

  • 使不同基于DocumentArray的函数都可以遵从同一个配置状态(与OOP的思想相同
  • 让函数可以跟Flow适配
  • Flow中的Executor可以同时对多个DocumentArrays进行处理,并且能够快捷部署上云
  • 可以被容器化,并通过jina hub push/pull的方式实现共享
class SimpleIndexer(Executor):FILE_NAME = 'index.db'def __init__(self,pretrained_model_name_or_path: str = 'ViT-B/32',match_args: Optional[Dict] = None,table_name: str = 'simple_indexer_table2',traversal_right: str = '@r',traversal_left: str = '@r',device: str = 'cpu',**kwargs,):super().__init__(**kwargs)

参数解释:

  • pretrained_model_name_or_path:预训练的模型使用Vision Transformer-Base/32, input batch size为32*32,不同architecture参见下图
  • match_args:DocumentArray匹配函数的参数
  • table_name:数据库表名
  • traversal_right:索引对应DocumentArray默认的遍历路径
  • traversal_left:搜索对应DocumentArray默认的遍历路径
  • device:预处理设备
        self._match_args = match_args or {}self._index = DocumentArray(storage='sqlite',config={'connection': os.path.join(self.workspace, SimpleIndexer.FILE_NAME),'table_name': table_name,},)self.logger = JinaLogger(self.metas.name)self.default_traversal_right = traversal_rightself.default_traversal_left = traversal_leftself.pretrained_model_name_or_path = pretrained_model_name_or_pathself.device = devicemodel, preprocessor = clip.load(self.pretrained_model_name_or_path, device=device)self.preprocessor = preprocessorself.model = model@propertydef table_name(self) -> str:return self._index._table_name
  • storage:存储数据库格式
  • connection:database对应路径

创建索引

@requests方法:

  • 被装饰的方法在提供服务时被映射到网络端点上,与特殊网络请求相结合,并且需要对网络搜索做出响应
  • 可选的参数on=类似nodejs中的路由概念,将Executor中被装饰的方法与指定路径相绑定
    @requests(on='/index')def index(self,docs: 'DocumentArray',**kwargs,):t1 = time.time()if docs:self._index.extend(docs)t2 = time.time()print(t2 - t1)print(t1)print(t2)

docs统一以DocumentArray的类型存储,并添加到索引之中。

神经搜索

    @requests(on='/search')def search(self,docs: 'DocumentArray',parameters: Optional[Dict] = None,**kwargs,):match_args = ({**self._match_args, **parameters}if parameters is not Noneelse self._match_args)traversal_right = parameters.get('traversal_right', self.default_traversal_right)traversal_left = parameters.get('traversal_left', self.default_traversal_left)match_args = SimpleIndexer._filter_match_params(docs, match_args)# print('in indexer',docs[traversal_left].embeddings.shape, self._index[traversal_right])texts: DocumentArray = docs[traversal_left]stored_docs: DocumentArray = self._index[traversal_right]

对参数进行初始化,texts和stored_docs即为demo中输入的文字和被抽帧视频对应的图片。

        doc_ids = parameters.get("doc_ids")t1 = time.time()with torch.inference_mode():t1_00 = time.time()for text in texts:result = []text_features = text.embeddingtext.embedding = Nonefor sd in stored_docs:if doc_ids is not None and sd.uri not in doc_ids:continueimages_features = sd.embeddingprint('images len',len(images_features))t1_0 = time.time()tensor_images_features = [Tensor(image_features) for image_features in images_features]t1_1 = time.time()

对文本和图像分别进行embedding操作。

                    for i, image_features in enumerate(tensor_images_features):tensor = image_featuresprobs = self.score(tensor, text_features)result.append({"score": probs[0][0],"index": i,"uri": sd.uri,"id": sd.id})t1_2 = time.time()print("tensor cost:", t1_1 - t1_0)print("part score cost:", t1_2 - t1_1)print(t1_0)print(t1_1)print(t1_2)t2 = time.time()print('score cost:', t2 - t1)# print(parameters, type(parameters.get("thod")))

通过self.score计算文本和图片的匹配度,同时创建对应索引。

                _list = self.getMultiRange(result,0.1 if parameters.get("thod") is None else parameters.get('thod') )t3 = time.time()print('range cost:', t3 - t2)print(t1)print(t1_00)print(t2)print(t3)# print(index_list)

getMultiRange中获取相似度阈值thod的取值。

                docArr = DocumentArray.empty(len(index_list))for i, doc in enumerate(docArr):doc.tags["leftIndex"] = index_list[i]["leftIndex"]doc.tags["rightIndex"] = index_list[i]["rightIndex"]# print(index_list[i])doc.tags["maxImageScore"] = float(index_list[i]["maxImage"]["score"])doc.tags["uri"] = index_list[i]["maxImage"]["uri"]doc.tags["maxIndex"] = index_list[i]["maxImage"]["index"]# print(docArr)text.matches = docArr

对每段文本分别进行匹配,得出对应关系最优的图像。

多个片段获取

    def getMultiRange(self, result: list, thod = 0.1, maxCount: int = 10):ignore_range = {}index_list = []for i in range(maxCount):maxItem = self.getNextMaxItem(result, ignore_range)if maxItem is None:break# print(maxItem["score"])leftIndex, rightIndex, maxImage = self.getRange(maxItem, result, thod, ignore_range)index_list.append({"leftIndex": leftIndex,"rightIndex": rightIndex,"maxImage": maxImage})if maxImage["uri"] in ignore_range:ignore_range[maxImage["uri"]] += list(range(leftIndex, rightIndex + 1))else:ignore_range[maxImage["uri"]] = list(range(leftIndex, rightIndex + 1))# print(ignore_range)return index_list

此处返回maxCount即10个视频片段,因需要避免镜头重复出现,所以设置ignore_range,否则片段基本一致,只是前后错开几帧。

最大查找

    def getNextMaxItem(self, result: list, ignore_range: dict[list]):maxItem = Nonefor item in result:if item["uri"] in ignore_range and item["index"] in ignore_range[item["uri"]]:continueif maxItem is None:maxItem = itemif item["score"] > maxItem["score"]:maxItem = itemreturn maxItem

顺序遍历,有更大的就替换。

单一范围框定

    def getRange(self, maxItem, result: list, thod = 0.1, ignore_range: list[int] = None):maxImageScore = maxItem["score"]maxImageUri = maxItem["uri"]maxIndex = maxItem["index"]leftIndex = maxIndexrightIndex = maxIndexhas_ignore_range = ignore_range is not Noned_result = list(filter(lambda x: x["uri"] == maxImageUri, result))

简单初始化与filter过滤条件设定.

        for i in range(maxIndex):prev_index = maxIndex - 1 - iif has_ignore_range and prev_index in ignore_range:break# print(maxImageScore, thod, maxImageUri, maxIndex)if d_result[prev_index]["score"] >= maxImageScore - thod:leftIndex = prev_indexelse:breakfor i in range(maxIndex+1, len(d_result)):if has_ignore_range and i in ignore_range:breakif d_result[i]["score"] >= maxImageScore - thod:rightIndex = ielse:break

从最相似的一帧图片分别向左右两侧延伸开去,这里的thod可以近似理解成导数,如相邻的两帧变化过大则不被纳入这一视频片段,调整变量可以使标准变严格或者宽松。

        if (rightIndex - leftIndex) > 60:return self.getRange(maxItem, result, thod/2, ignore_range)return leftIndex, max(rightIndex, leftIndex + 10), d_result[maxIndex]

视频过长时将thod折半,相当于把纳入标准抬高,保证新的视频不长于上一片段,达到压缩的目的,同时通过max(rightIndex, leftIndex + 10)限制视频不超过10s。

匹配计算

    def score(self, image_features, text_features):logit_scale = self.model.logit_scale.exp()image_features = image_features / image_features.norm(dim=1, keepdim=True)text_features = text_features / text_features.norm(dim=1, keepdim=True)logits_per_image = logit_scale * image_features @ text_features.t()probs = logits_per_image.softmax(dim=-1).cpu().detach().numpy()return probs

对特征进行标准化,按照余弦相似度计算,最终通过softmax模型得出probability。

静态方法

    @staticmethoddef _filter_match_params(docs, match_args):# get only those arguments that exist in .matchargs = set(inspect.getfullargspec(docs.match).args)args.discard('self')match_args = {k: v for k, v in match_args.items() if k in args}return match_args

字典key-value形式实现条件过滤。

    @requests(on='/delete')def delete(self, parameters: Dict, **kwargs):deleted_ids = parameters.get('ids', [])if len(deleted_ids) == 0:returndel self._index[deleted_ids]

基本的删除操作。

    @requests(on='/update')def update(self, docs: DocumentArray, **kwargs):for doc in docs:try:self._index[doc.id] = docexcept IndexError:self.logger.warning(f'cannot update doc{doc.id}as it does not exist in storage')

更新操作。

    @requests(on='/fill_embedding')def fill_embedding(self, docs: DocumentArray, **kwargs):for doc in docs:doc.embedding = self._index[doc.id].embedding

通过id获取embedding。

    @requests(on='/clear')def clear(self, **kwargs):self._index.clear()

清空数据库操作。

进阶延展

GPU引入及与CPU协同

在进行Embedding的时候,通过.embed(..., device='cuda')来引入GPU(限制if torch.cuda.is_available()),同时如果DocumentArrary过大,可以使用.embed(..., batch_size=128)调整batch_size。

在深度学习应用场景下,经常会导入大量数据,这时需要在CPU上进行预处理,并通过GPU做训练。这时可以使用DocArray提供的dataloader(),通过分batch的方式并行化完成。

不妨看看官网给出的示例。假如现有一个.proto文件,压缩格式为tar.gz,我们可以通过如下的方式导入数据。(num_worker指线程数)

import timefrom docarray import DocumentArraydef cpu_job(da):time.sleep(2)print('cpu job done')return dadef gpu_job(da):time.sleep(1)print('gpu job done')for da in DocumentArray.dataloader('da.protobuf.gz', func=cpu_job, batch_size=64, num_worker=4
):gpu_job(da)

这样就可以在保证流程的情况下,避免几类问题:

  • 数据量太大,内存溢出
  • CPU只有单核运行
  • CPU预处理较慢,导致GPU无法充分利用

须知流水线是遵从木桶原理的,因而需要保证有限的资源被合理地充分调度,并利用起来。

向量搜索

源代码中选择了SQLite作为后端,在处理较大体量的数据时,读取、更新、删除、条件检索Document时均有良好的表现。(参见One Million Scale Benchmark)然而SQLite在向量搜索应用中并不理想,虽然表中的Recall@10Recall@10Recall@10达到了1.00,但是其底层逻辑是穷尽式,而非检索nearest neighbour,因而效率非常低。

Elastic Search在Recall这一任务中性能最优,同时作为分布式搜索和分析引擎,较为常见。因而笔者将结合Jina生态粗略讲述Elastic Search使用方法,详细内容参见ES-Jina文件夹。

YAML配置如下:

version: "3.7"
services:elastic:image: docker.elastic.co/elasticsearch/elasticsearch:8.1.0environment:- xpack.security.enabled=false- discovery.type=single-nodeports:- "9200:9200"networks:- elasticnetworks:elastic:name: elastic

Jina Hub的使用

Do not reinvent the wheel. 不要重复造轮子。
在Jina Hub中存在SimpleIndexer的原型,源码也同样可获取,其中实现了除了搜索之外的绝大多数功能,可以直接进行调用。

这里简单给出两种通过docker调用的方式,只需要几行代码就可以实现(需要安装Kubernetes):

  1. Docarray Entrypoint
from docarray import Document, DocumentArrayda = DocumentArray([Document(text='hello')])
r = da.post('jinahub+docker://SimpleIndexer/latest')print(r.to_json())

通过python SimpleIndexer-docarray-docker.py指令启动服务。

  1. Jina Entrypoint
from jina import Flow
from docarray import Document, DocumentArrayf = Flow().add(uses='jinahub+docker://SimpleIndexer/latest')with f:
r = f.post('/', inputs=DocumentArray([Document(text='hello')]))
print(r.to_json())

通过python SimpleIndexer-jina-docker.py指令启动服务。

参考资料

Document
DocumentArray

【Datawhale跨模态实践学习笔记】项目后端学习相关推荐

  1. python学习笔记项目_python学习笔记——肆

    第四周 装饰器 本质是函数,装饰其他函数,为其他函数添加附加功能. 原则: 1 不能修改被装饰函数的源代码 2 不能修改被装饰函数的调用方式 函数就是一个变量 相当于把函数体赋值给函数名 当垃圾回收时 ...

  2. 跨模态行人重识别cm-SSFT:Cross-modality Person re-identification with Shared-Specific Feature Transfer 学习笔记

    目录 摘要 方法 试验 论文链接:Cross-modality Person re-identification with Shared-Specific Feature Transfer 摘要 在本文中 ...

  3. 跨模态行人重识别:Dynamic Dual-Attentive Aggregation Learningfor Visible-Infrared Person Re-Identification学习笔记

    目录 摘要 方法 模态内加权聚合(IWPA) 跨模态图结构化注意力(CGSA) Graph Construction Graph Attention 动态对偶聚合学习 试验 论文链接:Dynamic ...

  4. 基于双阶段度量学习的跨模态行人再识别

    基于双阶段度量学习的跨模态行人再识别 摘要: 由于从可见光和热成像摄像头采集而来的图像风格迥异,跨模态行人再识别面临着巨大挑战.目前的深度学习方法,大都利用度量学习来获取区分性特征.然而,现有的度量学 ...

  5. 文献阅读-融合注意力机制的 IETM 细粒度跨模态检索算法

    引用格式:翟一琛,顾佼佼,宗富强,姜文志.融合注意力机制的 IETM 细粒度跨模态 检索算法[J/OL].系统工程与电子技术. https://kns.cnki.net/kcms/detail/11. ...

  6. 跨模态行人重识别:Cross-Modality Person Re-Identification viaModality-Aware CollaborativeEnsemble Learning学习笔记

    基于模态感知的协同集成学习的跨模态行人重识别  简介 本文针对VT-Reid提出了一种基于中间层共享双流网络(MSTN)的模态感知协同集成(MACE)学习方法,该方法同时处理了特征层和分类器层的模态差 ...

  7. 跨模态行人重识别:Hetero-Center Loss for Cross-Modality Person Re-Identification学习记录笔记

    原文链接:https://www.sciencedirect.com/science/article/pii/S0925231219318156 目录 摘要 方法 CE loss Hetero-Cen ...

  8. 跨模态行人重识别:CM-NAS: Cross-Modality Neural Architecture Searchfor Visible-Infrared Person Re-Iden学习记录笔记

    目录 摘要 方法 试验 论文链接:CM-NAS: Cross-Modality Neural Architecture Search for Visible-Infrared Person Re-Id ...

  9. 跨模态行人重识别:Discover Cross-Modality Nuances for Visible-Infrared Person Re-Identification学习记录笔记

    目录 摘要 网络结构 具体方法 MAM PAM 模态分类损失 共享特征ID损失 中心簇损失 总损失 试验 注意模式 可视化分布 结果 原文链接:Discover Cross-Modality Nuan ...

最新文章

  1. python学完可以干啥-Python学完可以做什么
  2. 转:ECharts图表组件之简单关系图:如何轻松实现另类站点地图且扩展节点属性实现点击节点页面跳转...
  3. 使用virt-install安装kvm虚拟机时需要的问题
  4. zephyr_Facebook,IBM,Yahoo和更多新闻发布的物联网Zephyr项目
  5. 太原理工大学ICPC队介绍(2020版)
  6. Jetson tk1 刷机教程
  7. mysql2005安装asp_Sql server 2005安装时ASP.Net版本注册要求警告的解决方法
  8. LTP(Linux Test Project)使用指南
  9. dw选项卡代码_dreamweaver cs6快捷注释方法
  10. 零基础入门数据挖掘之金融风控-贷款违约预测
  11. 6.3 Git 工具 - 交互式暂存
  12. 【Buzz】离线语音转文字、实时语音识别
  13. jQuery世界国家城市选择插件
  14. Docker 容器技术入门
  15. RabbitMQ 安装与web后台管理界面开启
  16. 全球各国as自治系统总数排名、全球自治系统总数排名
  17. 如何查看 SQL 执行频率
  18. [实验室每日一题][20201124][Zip已知明文攻击]
  19. 互联网:消费互联网大势已去,产业互联网时代到来
  20. 文学方面的思考3 孙犁《嘱咐》欣赏

热门文章

  1. 数学知识(一):数论
  2. jOOQ 代码生成工具的使用说明
  3. 如何把HTML转换成动图,视频转gif 如何将视频制作gif动画图片
  4. 大数据有哪些培训机构?
  5. 什么是Kusama (KSM)以及与波卡的区别
  6. 小学计算机课优质课,2018年武汉市小学信息技术优质课评比
  7. 怎么把Word转换成PDF?这几种转换神器分享给你
  8. 【企业网盘】公有云和私有云的9大差异
  9. 【BSP视频教程】STM32H7视频教程第3期:整体捋顺STM32H7的HAL库和LL库的框架,再配合寄存器造轮子找到更适合自己的玩法(2022-01-21)
  10. 宝塔面板+NextCloud文档云 搭建流程