0%

GlobalPointer

  1. 参考:https://kexue.fm/archives/8373

  2. 计算公式:
    $$
    p(h_s, h_e, t) = q_{s,t}^Tk_{e,t}
    $$ {d}

    $$
    q_{s,t}=W_{s,t}h_s+b_{s,t}
    $$

    $$
    k_{e,t}=W_{e,t}h_s+b_{e,t}
    $$

  3. 核心思想

    类似于attention的打分机制,将多个实体类型的识别视为Muti-head机制,没一个head视为一种实体识别任务,最后利用attention的score(QK)作为打分

    考虑到start和end之间距离的关键信息,作者加入了旋转式位置编码(RoPE)。

  4. 核心代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    class GlobalPointer(Module):
    """全局指针模块
    将序列的每个(start, end)作为整体来进行判断
    """
    def __init__(self, heads, head_size,hidden_size,RoPE=True):
    super(GlobalPointer, self).__init__()
    self.heads = heads
    self.head_size = head_size
    self.RoPE = RoPE
    # 每个head代表一个类别,head_size*2是应为需要表征Q和K
    self.dense = nn.Linear(hidden_size,self.head_size * self.heads * 2)

    def forward(self, inputs, mask=None):
    inputs = self.dense(inputs)
    inputs = torch.split(inputs, self.head_size * 2 , dim=-1)
    inputs = torch.stack(inputs, dim=-2)
    qw, kw = inputs[..., :self.head_size], inputs[..., self.head_size:]
    # RoPE编码
    if self.RoPE:
    pos = SinusoidalPositionEmbedding(self.head_size, 'zero')(inputs)
    cos_pos = pos[..., None, 1::2].repeat(1,1,1,2)
    sin_pos = pos[..., None, ::2].repeat(1,1,1,2)
    qw2 = torch.stack([-qw[..., 1::2], qw[..., ::2]], 4)
    qw2 = torch.reshape(qw2, qw.shape)
    qw = qw * cos_pos + qw2 * sin_pos
    kw2 = torch.stack([-kw[..., 1::2], kw[..., ::2]], 4)
    kw2 = torch.reshape(kw2, kw.shape)
    kw = kw * cos_pos + kw2 * sin_pos
    # 计算内积
    logits = torch.einsum('bmhd , bnhd -> bhmn', qw, kw)
    # 排除padding,排除下三角
    logits = add_mask_tril(logits,mask)
    return logits / self.head_size ** 0.5

TPLinker(Muti_head)

  1. 参考:https://aclanthology.org/2020.coling-main.138/

  2. 计算公式
    $$
    p(h_s, h_e, t) = W_t \cdot h_{s, e}+b_t
    $$

    $$
    h_{s,e} = tanh(W_h\cdot[h_s;h_e]+b_h)
    $$

  3. 核心思想

    与globalPointer相比,muti-head是加性的,而GlobalPointer是乘性的。

  4. 核心代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    class MutiHeadSelection(Module):

    def __init__(self,hidden_size,c_size,abPosition = False,rePosition=False, maxlen=None,max_relative=None):
    super(MutiHeadSelection, self).__init__()
    self.hidden_size = hidden_size
    self.c_size = c_size
    self.abPosition = abPosition
    self.rePosition = rePosition
    self.Wh = nn.Linear(hidden_size * 2,self.hidden_size)
    self.Wo = nn.Linear(self.hidden_size,self.c_size)
    if self.rePosition:
    self.relative_positions_encoding = relative_position_encoding(max_length=maxlen,
    depth= 2 * hidden_size,max_relative_position=max_relative)

    def forward(self, inputs, mask=None):
    input_length = inputs.shape[1]
    batch_size = inputs.shape[0]
    if self.abPosition:
    # 由于为加性拼接,我们无法使用RoPE,因此这里直接使用绝对位置编码
    inputs = SinusoidalPositionEmbedding(self.hidden_size, 'add')(inputs)
    x1 = torch.unsqueeze(inputs, 1)
    x2 = torch.unsqueeze(inputs, 2)
    x1 = x1.repeat(1, input_length, 1, 1)
    x2 = x2.repeat(1, 1, input_length, 1)
    concat_x = torch.cat([x2, x1], dim=-1)
    # 与TPLinker原论文中不同的是,通过重复+拼接的方法构建的矩阵能满足并行计算的要求。
    if self.rePosition:
    # 如果使用相对位置编码,我们则直接在矩阵上实现相加
    relations_keys = self.relative_positions_encoding[:input_length, :input_length, :].to(inputs.device)
    concat_x += relations_keys
    hij = torch.tanh(self.Wh(concat_x))
    logits = self.Wo(hij)

Tencent Muti-head

  1. 参考:https://arxiv.org/pdf/2012.05426v5.pdf

  2. 计算公式
    $$
    p(h_s, h_e, t) = U\cdot tanh(Vs_{s,e})
    $$

    $$
    s_{s,e}=[h_s;h_e;h_s-h_e;h_s\cdot h_e]
    $$

    与TPLinker相比,加入了更多的交互元素:$h_s-h_e$ ,$h_s\cdot h_e$ (作差与点积)

  3. 核心思想

    提出了基于片段标注解决实体数据标注缺失的训练方法——负采样。

  4. 核心代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    def generate_whole_label(self, positions, length):
    """
    负采样方式:直接随机采样k(文本长度*采样率)个负样本
    """
    neg_positions = []
    neg_num = int(length * self.neg_rate) + 1

    # 候选样本,即排除正样本之后的全体负样本
    candies = flat_list([[(i, j) for j in range(i, length) if (i, j) not in positions] for i in range(length)])

    if len(candies) > 0:
    sample_num = min(neg_num, len(candies))
    assert sample_num > 0

    # 随机采样若干个负样本
    np.random.shuffle(candies)
    for i, j in candies[:sample_num]:
    neg_positions.append((i, j))

    return neg_positions
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    class TxMutihead(Module):

    def __init__(self,hidden_size,c_size,abPosition = False,rePosition=False, maxlen=None,max_relative=None):
    super(TxMutihead, self).__init__()
    self.hidden_size = hidden_size
    self.c_size = c_size
    self.abPosition = abPosition
    self.rePosition = rePosition
    self.Wh = nn.Linear(hidden_size * 4, self.hidden_size)
    self.Wo = nn.Linear(self.hidden_size,self.c_size)
    if self.rePosition:
    self.relative_positions_encoding = relative_position_encoding(max_length=maxlen,
    depth= 4 * hidden_size,max_relative_position=max_relative)

    def forward(self, inputs, mask=None):
    input_length = inputs.shape[1]
    batch_size = inputs.shape[0]
    if self.abPosition:
    # 由于为加性拼接,我们无法使用RoPE,因此这里直接使用绝对位置编码
    inputs = SinusoidalPositionEmbedding(self.hidden_size, 'add')(inputs)
    x1 = torch.unsqueeze(inputs, 1)
    x2 = torch.unsqueeze(inputs, 2)
    x1 = x1.repeat(1, input_length, 1, 1)
    x2 = x2.repeat(1, 1, input_length, 1)
    concat_x = torch.cat([x2, x1,x2-x1,x2.mul(x1)], dim=-1)
    if self.rePosition:
    relations_keys = self.relative_positions_encoding[:input_length, :input_length, :].to(inputs.device)
    concat_x += relations_keys
    hij = torch.tanh(self.Wh(concat_x))
    logits = self.Wo(hij)
    logits = logits.permute(0,3,1,2)
    logits = add_mask_tril(logits, mask)
    return logits

Deep Biaffine

  1. 参考:https://arxiv.org/pdf/2005.07150.pdf

  2. 计算公式
    $$
    p(h_s, h_e, t) = h_s^TU_th_e+W_t[h_s;h_e]+b
    $$

  3. 核心思想

    deep biaffine是加性和乘性的结合

  4. 核心代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    class Biaffine(Module):

    def __init__(self, in_size, out_size, Position = False):
    super(Biaffine, self).__init__()
    self.out_size = out_size
    self.weight1 = Parameter(torch.Tensor(in_size, out_size, in_size))
    self.weight2 = Parameter(torch.Tensor(2 * in_size + 1, out_size))
    self.Position = Position
    self.reset_parameters()

    def reset_parameters(self):
    torch.nn.init.kaiming_uniform_(self.weight1,a=math.sqrt(5))
    torch.nn.init.kaiming_uniform_(self.weight2,a=math.sqrt(5))

    def forward(self, inputs, mask = None):
    input_length = inputs.shape[1]
    hidden_size = inputs.shape[-1]
    if self.Position:
    #引入绝对位置编码,在矩阵乘法时可以转化为相对位置信息
    inputs = SinusoidalPositionEmbedding(hidden_size, 'add')(inputs)
    x1 = torch.unsqueeze(inputs, 1)
    x2 = torch.unsqueeze(inputs, 2)
    x1 = x1.repeat(1, input_length, 1, 1)
    x2 = x2.repeat(1, 1, input_length, 1)
    concat_x = torch.cat([x2, x1], dim=-1)
    concat_x = torch.cat([concat_x, torch.ones_like(concat_x[..., :1])],dim=-1)
    # bxi,oij,byj->boxy
    logits_1 = torch.einsum('bxi,ioj,byj -> bxyo', inputs, self.weight1, inputs)
    logits_2 = torch.einsum('bijy,yo -> bijo', concat_x, self.weight2)
    logits = logits_1 + logits_2
    logits = logits.permute(0,3,1,2)
    logits = add_mask_tril(logits, mask)
    return logits

ChatGLM3(6B)、Qwen(7B,14B)、Baichuan2(7B,13B)等相关大模型的应用先关知识点。

一般来说,模型参数超过1B(10亿)才能称之为大模型。比如bert参数量为110M(1.15亿左右)就不能算大模型。

  1. 中文大模型性能测评(2023年12月)

image-20240112094237809

  1. base与chat(如qwen-7B、qwen-7B-chat)

    • base:base模型是在大规模语料上以LM这种形式的预训练任务(预测下一个token)训练得来的
    • chat:chat模型是在base模型上sft+rlhf上微调得来的

    简而言之,base模型很强但是听不懂人话,需要用少量的tuning(微调)数据来让它听懂人话,进而和人类对话。

  2. 模型量化

    大模型权重都很大,占用显存很高,所以需要对其量化,float16->int8->int4

    大模型(chatglm3-6B) 模型大小 最低显存
    float16 12g 13g
    int8 8g
    int4 4g 6g
  3. 模型微调

    基于开源大模型,使用私有数据进行微调,使模型更加适配垂直领域。如:

    微调方案

    • Freeze: 参数冻结,对原始模型部分参数进行冻结操作,仅训练部分参数,以达到在单卡或不进行TP或PP操作,就可以对大模型进行训练。

    • P-Tuning(v1, v2):针对于大模型的soft-prompt方法

    • lora: 在大型语言模型上对指定参数增加额外的低秩矩阵,并在模型训练过程中,仅训练而外增加的参数。当“秩值”远小于原始参数维度时,新增的低秩矩阵参数量很小,达到仅训练很小的参数,就能获取较好的结果。

  4. 模型推理框架

    原生的大模型推理速度太慢,使用一些大模型推理加速框架可以加速推理。

    • vLLM(加州大学伯克利分校)

      vLLM是一个开源的大模型推理加速框架,通过PagedAttention高效地管理attention中缓存的张量,实现了比HuggingFace Transformers高14-24倍的吞吐量。

    • HuggingFace TGI(HuggingFace官方)

      Text Generation Inference(TGI)是 HuggingFace 推出的一个项目,作为支持 HuggingFace Inference API 和 Hugging Chat 上的LLM 推理的工具,旨在支持大型语言模型的优化推理。

    • llama.cpp(Georgi Gerganov开发,资深业界大佬)

    • JittorLLMs

    • lightLLM

    • TensorRT-LLM(NVIDIA)

    • fastllm(目前使用,国人开发)

      纯c++的全平台llm加速库,便于跨平台移植,可以在安卓上直接编译。

0. 安装加密库

1
pip install encryptKit -i https://pypi.org/simple

基于github源码安装

1
pip install git+https://github.com/fushengwuyu/pyencryption

1. 项目加密

使用Cython将python脚本加密为C语言的动态库,除了预留一个项目入口之外,其余统一将.py脚本转化为.so库,实现对整个python项目加密的任务。

执行命令:

1
2
3
4
5
6
7
8
9
10
11
python -m encryption.encrypt_code --help
usage: encrypt_code.py [-h] [--except_dirs EXCEPT_DIRS [EXCEPT_DIRS ...]] [--enc_dir ENC_DIR] [--build_dir BUILD_DIR] [--entrance ENTRANCE]

encrypt project

optional arguments:
-h, --help show this help message and exit
--except_dirs EXCEPT_DIRS [EXCEPT_DIRS ...] 需要屏蔽加密的路径
--enc_dir ENC_DIR 解密项目路径
--build_dir BUILD_DIR 加密后的路径
--entrance ENTRANCE 项目程序入口

可以看到相关参数的含义。

使用示例:

1
python -m encryption.encrypt_code --enc_dir npc_server --build_dir npc_server_enc

加密前项目目录如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
├── apps
│   ├── auth
│   │   ├── auth.py
│   │   ├── __init__.py
│   ├── extract
│   │   ├── chat.py
│   │   ├── __init__.py
│   │   ├── time_extract.py
│   │   └── view.py
│   ├── http_response.py
│   ├── __init__.py
│   ├── robot_notice
│   │   ├── info_gather.py
│   │   ├── __init__.py
│   │   └── view.py
│   └── utils.py
├── config.yaml
├── mq_mode
│   ├── __init__.py
│   ├── mqtt_plugins.py
│   └── utils.py
├── mq_run.py
├── readme.md
├── run.py
└── t.py

加密后为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
├── apps
│   ├── auth
│   │   ├── auth.cpython-38-x86_64-linux-gnu.so
│   │   ├── __init__.cpython-38-x86_64-linux-gnu.so
│   │   └── __init__.py
│   ├── extract
│   │   ├── chat.cpython-38-x86_64-linux-gnu.so
│   │   ├── __init__.cpython-38-x86_64-linux-gnu.so
│   │   ├── __init__.py
│   │   ├── time_extract.cpython-38-x86_64-linux-gnu.so
│   │   └── view.cpython-38-x86_64-linux-gnu.so
│   ├── http_response.cpython-38-x86_64-linux-gnu.so
│   ├── __init__.cpython-38-x86_64-linux-gnu.so
│   ├── __init__.py
│   ├── robot_notice
│   │   ├── info_gather.cpython-38-x86_64-linux-gnu.so
│   │   ├── __init__.cpython-38-x86_64-linux-gnu.so
│   │   ├── __init__.py
│   │   └── view.cpython-38-x86_64-linux-gnu.so
│   └── utils.cpython-38-x86_64-linux-gnu.so
├── config.yaml
├── mq_mode
│   ├── __init__.cpython-38-x86_64-linux-gnu.so
│   ├── __init__.py
│   ├── mqtt_plugins.cpython-38-x86_64-linux-gnu.so
│   └── utils.cpython-38-x86_64-linux-gnu.so
├── mq_run.cpython-38-x86_64-linux-gnu.so
├── readme.md
├── run.cpython-38-x86_64-linux-gnu.so
├── run.py
└── t.cpython-38-x86_64-linux-gnu.so

正常启动项目入口python run.py 即可启动项目。

2. license授权与认证

待续。。。

通常来说,模型推理是一个耗时任务,如何在接受模型推理的请求之后,不阻塞其他请求,就成为了模型部署过程中的一个难点问题。很容易想到使用异步方式来实现,接下来,使用使用sanic框架来演示如何实现模型的异步部署。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# author: sunshine
# datetime:2023/12/14 下午2:10
import asyncio
import time
import functools
from sanic import Sanic
from sanic.response import text
from apps.service.uie_predictor import load_uie_model

app = Sanic(__name__)
uie = load_uie_model()

text1 = '如何演好自己的角色,请读《演员自我修养》《喜剧之王》周星驰崛起于穷困潦倒之中的独门秘笈'


@app.get('/f1')
async def f1(request):
result = await async_decorate(model_inference, text)
return text(result)


def model_inference(text):
schema = [{'人物': ['祖籍', '出生地', '民族', '毕业院校', '出生日期', '妻子'], "影视作品": ["上映日期", '导演', '制片人', '主演']}]
uie.set_schema(schema)
result = uie(text1)
time.sleep(10)
return str(result)


async def async_decorate(func, *args, **kwargs):
loop = asyncio.get_event_loop()
partial_func = functools.partial(func, **kwargs)
result = await loop.run_in_executor(None, partial_func, *args)
return result


@app.get('/f2')
def f2(request):
return text('f2')




if __name__ == '__main__':
app.run('0.0.0.0', port=5555)

最核心的在于使用loop.run_in_executor函数将同步函数包装为异步函数。

更进一步,可以将异步包装函数封装为一个装饰器,简化使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# author: sunshine
# datetime:2023/12/14 下午2:10
import asyncio
import functools
import time

from sanic import Sanic
from sanic.response import text
from apps.service.uie_predictor import load_uie_model

app = Sanic(__name__)
uie = load_uie_model()

text1 = '如何演好自己的角色,请读《演员自我修养》《喜剧之王》周星驰崛起于穷困潦倒之中的独门秘笈'


def async_decorator(func):
async def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
partial_func = functools.partial(func, **kwargs)
result = await loop.run_in_executor(None, partial_func, *args)
return result

return wrapper


@app.get('/f1')
async def f1(request):
result = await model_inference(text1)
return text(result)


@async_decorator
def model_inference(sentence):
schema = [{'人物': ['祖籍', '出生地', '民族', '毕业院校', '出生日期', '妻子'], "影视作品": ["上映日期", '导演', '制片人', '主演']}]
uie.set_schema(schema)
result = uie(sentence)
time.sleep(10)
return str(result)


@app.get('/f2')
def f2(request):
return text('f2')


if __name__ == '__main__':
app.run('0.0.0.0', port=5555)

二者作用相当。

针对模型线上推理服务,一个请求一个请求的推理,其服务的吞吐量会及其低下,也做不到资源的充分利用。可以考虑批量预测的方式,提升服务的并发能力, 模型推理时间与批处理数量batch并不呈现线性关系,直到模型占用全部显存,到达batch瓶颈。

要实现上面的目标,需要以下几个模块

  • 前端服务:用于接收请求、返回结果。可以是Http、PRC等各种协议。是一个独立进程。
  • 推理Worker:负责模型的初始化、批量推理数据构建、推理计算。是一个独立进程。
  • 任务队列:前端服务收到请求之后把计算任务送入任务队列;推理Worker监听该队列,每次取出一个小批量由模型推理
  • 结果队列:推理服务推理完成后将结果送入结果队列;前端服务监听该队列,获得推理结果
  • 结果分发:在将任务送入任务队列前需要生成任务的唯一标识,从结果队列取回结果后根据标识获取到任务对应的结果

其中两个任务队列的实现方式很多,可以通过一些成熟的中间件例如Kafka、Redis等,但为了避免外部依赖,这次我选择使用Python原生的多进程队列。结果队列监听和分发通过前端服务进程的一个子线程来完成。

以加载清华开源的glm模型为例,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
import time
from transformers.generation.logits_process import LogitsProcessor
from sanic import Sanic
from sanic.response import json
import asyncio
import logging
import multiprocessing as mp
import threading
import uuid
from queue import Empty
from transformers import AutoTokenizer, AutoModel
from cachetools import TTLCache
import torch
from enum import Enum

app = Sanic('test')


class BaseInferLightWorker:

def __init__(self, data_queue: mp.Queue, result_queue: mp.Queue,
model_args: dict,
batch_size=16, max_delay=0.1,
ready_event=None,
max_length: int = 2048, num_beams=1,
do_sample=True, top_p=0.7, temperature=0.95, logits_processor=None, **kwargs
) -> None:
self.data_queue = data_queue
self.result_queue = result_queue
self.batch_size = batch_size
self.max_delay = max_delay
self.logger = logging.getLogger('InferLight-Worker')
self.logger.setLevel(logging.DEBUG)
self.gen_kwargs = {"max_length": max_length, "num_beams": num_beams, "do_sample": do_sample, "top_p": top_p,
"temperature": temperature, "logits_processor": logits_processor, **kwargs}
self.load_model(model_args)

# 由于模型载入时间较长
# 加载完成后使用一个event来通知主进程
if ready_event:
ready_event.set()

def run(self):
self.logger.info('Worker started!')
while True:
data, task_ids = [], []
since = time.time()
for i in range(self.batch_size):
try:
# 从数据队列获取数据
d = self.data_queue.get(block=True, timeout=self.max_delay)
task_ids.append(d[0])
data.append(d[1])
self.logger.info('get one new task')
except Empty:
pass
if time.time() - since >= self.max_delay:
break
if len(data) > 0:
start = time.perf_counter()
batch = self.build_batch(data)
results = self.inference(batch)
end = time.perf_counter()
time_elapsed = (end - start) * 1000
self.logger.info(f'inference succeeded. batch size: {len(data)}, time elapsed: {time_elapsed:.3f} ms')
# 将结果写入结果队列
for (task_id, result) in zip(task_ids, results):
self.result_queue.put((task_id, result))

def build_batch(self, requests):
raise NotImplementedError

def inference(self, batch):
raise NotImplementedError

def load_model(self, model_args):
raise NotImplementedError

@classmethod
def start(cls, data_queue: mp.Queue, result_queue: mp.Queue, model_args: dict, batch_size=16, max_delay=0.1,
ready_event=None):
w = cls(data_queue, result_queue, model_args, batch_size, max_delay, ready_event)
w.run()


class InferStatus(Enum):
SUCCEED = 0
TIMEOUT = 1


class InferResponse:

def __init__(self, status: InferStatus, result) -> None:
self.status = status
self.result = result

def succeed(self):
return self.status == InferStatus.SUCCEED


class LightWrapper:

def __init__(self, worker_class, model_args: dict,
batch_size=16, max_delay=0.1) -> None:
# setup logger
self.logger = logging.getLogger('InferLight-Wrapper')
self.logger.setLevel(logging.INFO)

# 用一个5秒自动超时的缓存来保存结果
self.result_cache = TTLCache(maxsize=10000, ttl=5)

self.mp = mp.get_context('spawn')
self.result_queue = self.mp.Queue()
self.data_queue = self.mp.Queue()

# 启动推理Worker
self.logger.info('Starting worker...')
worker_ready_event = self.mp.Event()
self._worker_p = self.mp.Process(target=worker_class.start, args=(
self.data_queue, self.result_queue, model_args, batch_size, max_delay, worker_ready_event
), daemon=True)
self._worker_p.start()

# 最长等待30秒
is_ready = worker_ready_event.wait(timeout=30)
if is_ready:
self.logger.info('Worker started!')
else:
self.logger.error('Failed to start worker!')

# 启动收集结果的线程
self.back_thread = threading.Thread(
target=self._collect_result, name="thread_collect_result")
self.back_thread.daemon = True
self.back_thread.start()

def _collect_result(self):
# 在线程中不断读取结果队列
# 以task_id为key将结果写入到结果缓存中
self.logger.info('Result collecting thread started!')
while True:
try:
msg = self.result_queue.get(block=True, timeout=0.01)
except Empty:
msg = None
if msg is not None:
(task_id, result) = msg
self.result_cache[task_id] = result

async def get_result(self, task_id):
# 非阻塞地获取任务的结果
while task_id not in self.result_cache:
await asyncio.sleep(0.01)
return self.result_cache[task_id]

async def predict(self, input, timeout=6) -> InferResponse:
# generate unique task_id
task_id = str(uuid.uuid4())

# send input to worker process
self.data_queue.put((task_id, input))
try:
# 这里设置了最大等待时间
result = await asyncio.wait_for(self.get_result(task_id), timeout=timeout)
except asyncio.TimeoutError:
return InferResponse(InferStatus.TIMEOUT, None)

return InferResponse(InferStatus.SUCCEED, result)


class InvalidScoreLogitsProcessor(LogitsProcessor):
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor) -> torch.FloatTensor:
if torch.isnan(scores).any() or torch.isinf(scores).any():
scores.zero_()
scores[..., 20005] = 5e4
return scores


class MyWorker(BaseInferLightWorker):

def load_model(self, model_args):

self.tokenizer = AutoTokenizer.from_pretrained(model_args['model'], trust_remote_code=True)
self.model = AutoModel.from_pretrained(model_args['model'], trust_remote_code=True).half().cuda()
self.device = torch.device('cuda')
return

def build_batch(self, requests):

prompts = []
for query, user_history in requests:
if not user_history:
prompt = query
else:
prompt = ""
for i, (old_query, response) in enumerate(user_history):
prompt += "[Round {}]\n问:{}\n答:{}\n".format(i, old_query, response)
prompt += "[Round {}]\n问:{}\n答:".format(len(user_history), query)
prompts.append(prompt)
input_ids = self.tokenizer(prompts, return_tensors="pt", padding=True)
input_ids = input_ids.to(self.device)
return [input_ids, requests]

@torch.no_grad()
def inference(self, input_ids):
input_ids, requests = input_ids
outputs = self.model.generate(**input_ids, **self.gen_kwargs)
result = []

for output, input_id, (query, user_history) in zip(outputs, input_ids["input_ids"], requests):
out = output.tolist()[len(input_id):]
response = self.tokenizer.decode(out)
response = response.strip()
response = response.replace("[[训练时间]]", "2023年")
user_history = user_history + [(query, response)]
result.append([response, user_history])

return result


@app.post('/batch_predict')
async def batched_predict(request):
# global history
history = request.app.ctx.history
now = time.time()
openid = request.json['openid']
if openid not in history:
user_history = []
else:
user_history = history[openid]["history"]
user_last_time = history[openid]["user_last_time"]

if now - user_last_time > 600:
# 时间间隔超过10分钟,则重置历史
user_history = []

while len(user_history) > 5 or sum([len(x) + len(y) for x, y in user_history]) > 1024:
user_history = user_history[1:]
dummy_input = [request.json['text'], user_history]
# print(request.app.wrapped_model)
response = await request.app.ctx.wrapped_model.predict(dummy_input, timeout=20)

if not response.succeed():
return json({'output': None, 'status': 'failed'})

history[openid] = {"history": response.result[1], "user_last_time": now}
request.app.ctx.history = history
return json({'output': response.result[0]})


config = {
'model': "/sdk/pre_models/chatglm-6b-int4",
'use_cuda': True
}


@app.listener('before_server_start')
async def init(app, loop):
history = {}

wrapped_model = LightWrapper(MyWorker, config, batch_size=10, max_delay=0.05)
app.ctx.wrapped_model = wrapped_model
app.ctx.history = history


if __name__ == '__main__':
app.run(port=5008)

未使用批量预测的情况下,模型占用显存6g,并发大约在20。替换成批量预测之后,并发可以达到150,提升效果显著。

参考:https://zhuanlan.zhihu.com/p/382306521

常见的利用github或者gitee搭建图床方式,总会遇到一些奇怪的原因导致无法使用,比如gitee的防盗链,github科学上网问题。我理解的图床就是一个简单的文件服务器,只是这个文件服务器只存储图片而已,有了这个认识,我们可以利用闲置的服务器(云服务器或者自家电脑内网穿透方式)来搭建一个文件服务器。步骤如下:

  1. 搭建文件服务器

    该工具目前为linux 版本,windows无法使用

    • 下载文件服务器工具:链接: https://pan.baidu.com/s/14EIL3ryx45OApcMV7S6LnQ 提取码: vuny

    • 修改配置文件

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      #服务监听店端口
      port=8080
      #上传文件存放路径
      uploadDir=/home/share
      #上传 http请求路由
      route=/upload
      #是否token开启验证
      auth=false
      #验证token值,与客户端上传比对
      token=helloworld
      #end
    • 启动服务

      1
      ./CloudDiskWeb config.ini

      访问图片url:http://host:8080/xxx.png

      上传路由为:http://host:8080/upload

    1. 编写自动上传文件脚本 upload.py

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      import requests
      import sys

      upload_url = 'http://132.232.11.158:5008/upload'


      def upload_images(images, dir='/sunshine'):
      for img in images:
      file_name = img.split('/')[-1]
      files = [
      ('file', (file_name, open(img, 'rb'), 'image/png'))
      ]

      response = requests.request("POST", upload_url + dir, files=files)
      if response.status_code == 200:
      path = f'{dir}/{file_name}'
      else:
      path = ""
      print(f"{upload_url.strip('/upload')}{path}") # 一定要输出文件路径,不然typora无法获取到新图片的url


      if __name__ == '__main__':
      images = sys.argv[1:]
      upload_images(images)

    2. typora配置

      文件->偏好设置->图像->上传服务 选择自定义命令,在命令中输入python %home/upload.py

      image-20230315152638971

      到此,所有配置完成。

      粘贴图片到typora后,自动上传并显示新的图片url。

最近项目需要,在sanic中搭配使用mqtt,由于sanic是一个协诚异步框架,所以也必须使用mqtt的异步版本,常见的有:hbmqtt (项目已经不更新,版本太低不匹配),asyncio-mqttasyncio-paho。这里我们使用asyncio-paho库。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import json
from sanic import Sanic, response
from asyncio_paho import AsyncioPahoClient

app = Sanic(__name__)

# mqtt配置
host = '192.168.0.15'
port = 1883
topic = 'mytopic'
client_id = 'fsdfdsdsfs'


@app.listener('before_server_start')
async def server_init(app, loop):
app.client = AsyncioPahoClient(client_id, loop=loop)
app.client.asyncio_listeners.add_on_connect(on_connect_async)
app.client.asyncio_listeners.add_on_message(on_message_async)
await app.client.asyncio_connect(host=host, port=port)


async def on_connect_async(client, userdata, flags_dict, result):
await client.asyncio_subscribe(topic=topic)


async def on_message_async(client, userdata, msg):
"""
消息处理函数
"""
print('====', json.loads(msg.payload))


@app.route('/f1', methods=['post'])
async def f1(request):
d = request.json
request.app.client.publish(
topic,
payload=json.dumps(d)
)
return response.text('success')


if __name__ == '__main__':
app.run('0.0.0.0', port=3331)

可以看到,代码比较简单,核心在于服务启动之前,初始化好mqtt客户端即可。

启动服务,调用http://localhost:3331/f1 。控制台输出订阅的消息

1
2
[2022-07-13 15:20:03 +0800] - (sanic.access)[INFO][127.0.0.1:52116]: POST http://localhost:3331/f1  200 7
==== {'key': '1234567'}

成功~

针对于客户纯内网环境部署python项目,最优的解决方式当然是docker部署。若使用宿主机部署方式,如何解决依赖问题就是一个大问题,常见的部署包管理器有:apt,yum。这里以Ubuntu为例,示例如何解决依赖问题。

1. 系统模块依赖

针对系统包的缺失,可以有两种方式解决:

  • 搭建系统源

  • 准备离线依赖包

    1. apt-get命令提供了download-only参数,可以把安装模块所需要的所有依赖包下载到本地,如:

      1
      apt-get install --download-only build-essential

      下载好的包默认路径为:/var/cache/apt/archives

    2. 将下载的依赖包打包拷贝到目标服务器

    3. 安装

      1
      dpkg -i *.deb

2. python包依赖

备注:导出项目依赖包

1
pipreqs ./ --encoding=utf-8

和系统依赖流程类似,python依赖解决也有两种办法:

  • 搭建pypi源

    下载pypi仓库就是一个庞大的工程,部署项目不推荐。

  • 离线安装依赖包

    1. pip download -r requirement.txt

      通过download,将项目需要的依赖包下载到本地

    2. 将下载好的依赖包拷贝到目标服务器

    3. 安装

      1
      ls | grep "*.whl" | xargs pip install

      若依赖包目录中包含源码安装的包,如:xxx.tar.gz,可以一起安装

      1
      ls | xargs pip install

OpenVINO是英特尔针对自家硬件平台开发的一套深度学习工具库,包含推理库,模型优化等等一系列与深度学习模型部署相关的功能。这里介绍如何用docker部署OpenVINO的推理模型。

1. arm架构编译安装

目前并没有arm架构的openvino包,需要自己编译安装。。。

  1. 拉取源码

    1
    2
    3
    git clone https://github.com/openvinotoolkit/openvino.git
    cd openvino
    git submodule update --init --recursive
  2. 安装依赖

    1
    2
    chmod +x install_build_dependencies.sh
    ./install_build_dependencies.sh
  3. 编译

    1
    2
    3
    mkdir build && cd build
    cmake -DCMAKE_BUILD_TYPE=Release ..
    make --jobs=$(nproc --all)

基本上等到编译结束,快的话个把小时过去了。如果有vpn,挂上vpn安装依赖会快一些。

  1. 编码python包

    1
    pip install Cython
    1
    2
    cd %openvino/src/bindings/python/wheel
    python setup.py install

    一切顺利的话,个把小时又过去了。。。

  2. 验证

    1
    from openvino.runtime import Core

    如果报错:ImportError: libopenvino.so: cannot open shared object file: No such file or directory

    libopenvino.so 拷贝到 $site-packages%/openvino/libs/libopenvino.so 即可

2. x86架构安装

1
pip install openvino

3. 镜像方式

  1. 拉取镜像

    1
    docker pull openvino/onnxruntime_ep_ubuntu18
  2. 启动镜像

    1
    docker run -it $image_name /bin/bash

    注意:该镜像并未安装opencv-python,需要自己安装

4. 测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import cv2
import numpy as np
from openvino.runtime import Core

ie = Core()
model = ie.read_model(model="yolov4-tiny-obj_best.onnx")
compiled_model = ie.compile_model(model=model, device_name="CPU")


image = cv2.cvtColor(cv2.imread(filename="fire1.png"), code=cv2.COLOR_BGR2RGB)
image = cv2.resize(src=image, dsize=(608, 608))
img = np.transpose(image, (2, 0, 1)).astype(np.float32)
img = np.expand_dims(img, axis=0)
img /= 255.0
img = np.ascontiguousarray(img)

result_infer = compiled_model([img])
print(list(result_infer.values()))

python项目交付给客户时,为了安全起见,可以将其加密,而常见的加密方式pyc并不安全,可以很轻易的被破解。本文提供了一种对python项目加密的方式,为每个py文件加密为so文件,难以破解。同时,本文提供了一种license授权机制,可以指定加密项目在指定机器上运行,且可以配置授权时间。

项目结构如下:

1
2
3
4
5
6
7
8
9
10
11
├── app
│   ├── create_app.py
│   ├── __init__.py
│   ├── license_utils.py
│   ├── manage
│   │   ├── __init__.py
│   │   ├── m1.py
├── readme.md
├── run.py
└── setup.py

项目地址:https://github.com/fushengwuyu/encrypt_license.git

1. 生成license文件

license代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import re
import sys
import datetime
import subprocess
from Crypto.Cipher import AES
from binascii import a2b_hex
from binascii import b2a_hex


class LicenseEncode:
def __init__(self, mac, license_path, expired_date=None):
self.mac = mac
self.license_path = license_path
self.date = (datetime.datetime.now() + datetime.timedelta(days=30)).strftime(
'%Y%m%d') if expired_date is None else expired_date

def encrypt(self, content):
# content length must be a multiple of 16.
while len(content) % 16:
content += ' '
content = content.encode('utf-8')
# Encrypt content.
aes = AES.new(b'2021052020210520', AES.MODE_CBC, b'2021052020210520')
encrypted_content = aes.encrypt(content)
return (b2a_hex(encrypted_content))

def gen_license_file(self):
with open(self.license_path, 'w') as LF:
LF.write('MAC : %s\n' % (self.mac))

LF.write('Date : %s\n' % (self.date))

sign = self.encrypt('%s#%s' % (self.mac, self.date))
print('Sign : ' + str(sign.decode('utf-8')) + '\n')
LF.write('Sign : ' + str(sign.decode('utf-8')) + '\n')


class LicenseDecode:
def __init__(self, license_path):
self.license_path = license_path

def license_check(self):
license_dic = self.parse_license_file()
sign = self.decrypt(license_dic['Sign'])
sign_list = sign.split('#')
mac = sign_list[0].strip()
date = sign_list[1].strip()
if (mac != license_dic['MAC']) or (date != license_dic['Date']):
print('*Error*: License file is modified!')
sys.exit(1)
# Check MAC and effective date invalid or not.
if len(sign_list) == 2:
macs = self.get_mac()
current_date = datetime.datetime.now().strftime('%Y%m%d')
if sign_list[0] not in macs:
print('*Error*: Invalid host!')
sys.exit(1)
# Current time must be before effective date.

if sign_list[1] < current_date:
print('*Error*: License is expired!')
sys.exit(1)
else:
print('*Error*: Wrong Sign setting on license file.')
sys.exit(1)

def parse_license_file(self):
license_dic = {}

with open(self.license_path, 'r') as LF:
for line in LF.readlines():
if re.match('^\s*(\S+)\s*:\s*(\S+)\s*$', line):
my_match = re.match('^\s*(\S+)\s*:\s*(\S+)\s*$', line)
license_dic[my_match.group(1)] = my_match.group(2)
return (license_dic)

def decrypt(self, content):
aes = AES.new(b'2021052020210520', AES.MODE_CBC, b'2021052020210520')
decrypted_content = aes.decrypt(a2b_hex(content.encode('utf-8')))
return (decrypted_content.decode('utf-8'))

def get_mac(self):
SP = subprocess.Popen('/sbin/ifconfig', shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
(stdout, stderr) = SP.communicate()
macs = []
for line in str(stdout, 'utf-8').split('\n'):
if re.match('^\s*ether\s+(\S+)\s+.*$', line):
my_match = re.match('^\s*ether\s+(\S+)\s+.*$', line)
mac = my_match.group(1)
macs.append(mac)
return macs


if __name__ == '__main__':
# make license file
mac = '00:28:f8:fa:25:bc' # 改为需授权的主机的mac地址

# 第一步,生成license文件
LicenseEncode(mac, './License.dat').gen_license_file()

执行脚本,生成license.dat的授权文件。

2. 验证license

执行run.py

1
2
3
4
5
6
7
8
9
10
/home/sunshine/python/encrypt_license/app/license_utils.py:75: DeprecationWarning: invalid escape sequence \s
if re.match('^\s*(\S+)\s*:\s*(\S+)\s*$', line):
[2022-04-22 10:13:28 +0800] [201518] [INFO] Sanic v21.12.0
[2022-04-22 10:13:28 +0800] [201518] [INFO] Goin' Fast @ http://0.0.0.0:5008
[2022-04-22 10:13:28 +0800] [201518] [INFO] mode: production, single worker
[2022-04-22 10:13:28 +0800] [201518] [INFO] server: sanic
[2022-04-22 10:13:28 +0800] [201518] [INFO] python: 3.8.10
[2022-04-22 10:13:28 +0800] [201518] [INFO] platform: Linux-5.13.0-39-generic-x86_64-with-glibc2.29
[2022-04-22 10:13:28 +0800] [201518] [INFO] packages: sanic-routing==0.7.1
[2022-04-22 10:13:28 +0800] [201518] [INFO] Starting worker [201518]

验证成功。

尝试修改license.dat

1
*Error*: License file is modified!

3. 加密整个项目

执行setup.py

1
2
3
x86_64-linux-gnu-gcc -pthread -Wno-unused-result -Wsign-compare -DNDEBUG -g -fwrapv -O2 -Wall -g -fstack-protector-strong -Wformat -Werror=format-security -g -fwrapv -O2 -g -fstack-protector-strong -Wformat -Werror=format-security -Wdate-time -D_FORTIFY_SOURCE=2 -fPIC -I/home/sunshine/envs/py3.8/include -I/usr/include/python3.8 -c setup.c -o build/temp/setup.o
x86_64-linux-gnu-gcc -pthread -shared -Wl,-O1 -Wl,-Bsymbolic-functions -Wl,-Bsymbolic-functions -Wl,-z,relro -g -fwrapv -O2 -Wl,-Bsymbolic-functions -Wl,-z,relro -g -fwrapv -O2 -g -fstack-protector-strong -Wformat -Werror=format-security -Wdate-time -D_FORTIFY_SOURCE=2 build/temp/setup.o -o build/setup.cpython-38-x86_64-linux-gnu.so
complate! time: 7.539966344833374 s

执行完成之后,会在当前目录生成build文件夹,该文件夹就是加密后的项目,将整个build文件夹交付客户即可。

build文件夹目录如下:

1
2
3
4
5
6
7
8
9
10
11
12
(py3.8) sunshine@sunshine-ThinkPad-T470p:~/python/encrypt_license$ tree build/
build/
├── app
│   ├── create_app.cpython-38-x86_64-linux-gnu.so
│   ├── license_utils.cpython-38-x86_64-linux-gnu.so
│   └── manage
│   └── m1.cpython-38-x86_64-linux-gnu.so
├── License.dat
├── readme.md
├── run.cpython-38-x86_64-linux-gnu.so
├── run.py
└── setup.cpython-38-x86_64-linux-gnu.so

验证一下build/run.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
┌─────────────────────────────────────────────────────────────────────────────────┐
│ Sanic v21.12.0 │
│ Goin' Fast @ http://0.0.0.0:5008 │
├───────────────────────┬─────────────────────────────────────────────────────────┤
│ │ mode: production, single worker │
│ ▄███ █████ ██ │ server: sanic │
│ ██ │ python: 3.8.10 │
│ ▀███████ ███▄ │ platform: Linux-5.13.0-39-generic-x86_64-with-glibc2.29 │
│ ██ │ packages: sanic-routing==0.7.1 │
│ ████ ████████▀ │ │
│ │ │
│ Build Fast. Run Fast. │ │
└───────────────────────┴─────────────────────────────────────────────────────────┘

[2022-04-22 10:21:13 +0800] [202771] [WARNING] Sanic is running in PRODUCTION mode. Consider using '--debug' or '--dev' while actively developing your application.
[2022-04-22 10:21:13 +0800] [202771] [INFO] Starting worker [202771]

成功~~