算子集成到推理框架¶
本文档详细介绍如何将 PTO 算子集成到主流推理框架(PyTorch、TensorFlow、ONNX Runtime 等),实现端到端的模型部署。
目录¶
1. 集成概述¶
1.1 集成架构¶
┌─────────────────────────────────────────────────────────────┐
│ 应用层 (Python/C++) │
│ 模型定义、训练、推理 │
└────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 框架层 (PyTorch/TensorFlow/ONNX) │
│ 算子注册、图优化、内存管理 │
└────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ PTO 算子层 (C++/CUDA) │
│ 自定义算子实现、内核启动 │
└────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 硬件层 (NPU/GPU/CPU) │
│ 指令执行、数据传输 │
└─────────────────────────────────────────────────────────────┘
1.2 集成方式对比¶
| 集成方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| Python 扩展 | 开发快速、易调试 | 性能开销较大 | 原型开发、快速验证 |
| C++ 扩展 | 性能好、类型安全 | 开发复杂、编译慢 | 生产环境、性能关键 |
| JIT 编译 | 灵活、动态优化 | 首次运行慢 | 动态图、研究实验 |
| AOT 编译 | 启动快、可优化 | 灵活性差 | 静态图、部署环境 |
1.3 集成流程¶
1. 定义算子接口
├─ 输入/输出 Tensor 规格
├─ 参数类型和默认值
└─ 算子属性(inplace、deterministic)
2. 实现算子逻辑
├─ 前向计算
├─ 反向传播(训练)
└─ 形状推导
3. 注册算子
├─ 框架算子注册
├─ 后端绑定
└─ 类型推导
4. 测试验证
├─ 单元测试
├─ 数值正确性
└─ 性能基准测试
5. 文档和示例
├─ API 文档
├─ 使用示例
└─ 性能报告
2. PyTorch 集成¶
2.1 通过 torch_npu 集成¶
步骤1:定义算子 Schema¶
// my_ops.cpp
#include <torch/extension.h>
#include <torch_npu/csrc/framework/utils/OpAdapter.h>
// 定义算子 schema
TORCH_LIBRARY_FRAGMENT(npu, m) {
// 基本算子
m.def("my_add(Tensor x, Tensor y) -> Tensor");
// 带标量参数
m.def("my_mul(Tensor x, Scalar alpha) -> Tensor");
// 多输出
m.def("my_split(Tensor x, int dim) -> (Tensor, Tensor)");
// inplace 算子
m.def("my_relu_(Tensor(a!) self) -> Tensor(a!)");
// 可选参数
m.def("my_conv(Tensor input, Tensor weight, Tensor? bias=None, "
"int stride=1, int padding=0) -> Tensor");
}
步骤2:实现算子¶
简单算子实现:
#include <pto/pto-inst.hpp>
// PTO Kernel 实现
__global__ __aicore__ void MyAddKernel(
__gm__ float* out,
__gm__ const float* x,
__gm__ const float* y,
uint32_t length) {
int block_idx = get_block_idx();
int block_num = get_block_num();
int elements_per_block = (length + block_num - 1) / block_num;
int start = block_idx * elements_per_block;
int end = min(start + elements_per_block, length);
using TileT = Tile<TileType::Vec, float, 16, 256>;
for (int i = start; i < end; i += 16 * 256) {
int size = min(16 * 256, end - i);
TileT tile_x, tile_y, tile_out;
TLOAD(tile_x, GlobalTensor(x + i));
TLOAD(tile_y, GlobalTensor(y + i));
TADD(tile_out, tile_x, tile_y);
TSTORE(GlobalTensor(out + i), tile_out);
}
}
// PyTorch 算子实现
at::Tensor my_add_impl(const at::Tensor& x, const at::Tensor& y) {
// 检查输入
TORCH_CHECK(x.device() == y.device(), "Inputs must be on same device");
TORCH_CHECK(x.sizes() == y.sizes(), "Inputs must have same shape");
TORCH_CHECK(x.scalar_type() == at::kFloat, "Only float32 supported");
// 分配输出
at::Tensor out = at::empty_like(x);
// 获取数据指针
float* out_ptr = out.data_ptr<float>();
const float* x_ptr = x.data_ptr<float>();
const float* y_ptr = y.data_ptr<float>();
uint32_t length = x.numel();
// 启动 kernel
int block_num = 24; // A3 核心数
EXEC_KERNEL_CMD(MyAddKernel, block_num, out_ptr, x_ptr, y_ptr, length);
return out;
}
复杂算子实现(带反向传播):
// 前向
class MyConvFunction : public torch::autograd::Function<MyConvFunction> {
public:
static at::Tensor forward(
torch::autograd::AutogradContext* ctx,
const at::Tensor& input,
const at::Tensor& weight,
const at::Tensor& bias,
int stride,
int padding) {
// 保存用于反向传播的张量
ctx->save_for_backward({input, weight, bias});
ctx->saved_data["stride"] = stride;
ctx->saved_data["padding"] = padding;
// 调用 PTO kernel
at::Tensor output = run_conv_forward(input, weight, bias, stride, padding);
return output;
}
static std::vector<at::Tensor> backward(
torch::autograd::AutogradContext* ctx,
std::vector<at::Tensor> grad_outputs) {
// 恢复保存的张量
auto saved = ctx->get_saved_variables();
auto input = saved[0];
auto weight = saved[1];
auto bias = saved[2];
int stride = ctx->saved_data["stride"].toInt();
int padding = ctx->saved_data["padding"].toInt();
auto grad_output = grad_outputs[0];
// 计算梯度
at::Tensor grad_input = run_conv_backward_input(
grad_output, weight, stride, padding);
at::Tensor grad_weight = run_conv_backward_weight(
grad_output, input, stride, padding);
at::Tensor grad_bias = run_conv_backward_bias(grad_output);
return {grad_input, grad_weight, grad_bias,
at::Tensor(), at::Tensor()}; // stride, padding 无梯度
}
};
// 包装函数
at::Tensor my_conv(
const at::Tensor& input,
const at::Tensor& weight,
const at::Tensor& bias,
int stride,
int padding) {
return MyConvFunction::apply(input, weight, bias, stride, padding);
}
步骤3:注册实现¶
// 注册到 NPU 后端
TORCH_LIBRARY_IMPL(npu, PrivateUse1, m) {
m.impl("my_add", TORCH_FN(my_add_impl));
m.impl("my_mul", TORCH_FN(my_mul_impl));
m.impl("my_conv", TORCH_FN(my_conv));
}
// 注册 autograd
TORCH_LIBRARY_IMPL(npu, Autograd, m) {
m.impl("my_conv", TORCH_FN(my_conv));
}
步骤4:编译为 Python 扩展¶
setup.py:
from setuptools import setup
from torch.utils.cpp_extension import BuildExtension, CppExtension
setup(
name='my_pto_ops',
ext_modules=[
CppExtension(
name='my_pto_ops',
sources=['my_ops.cpp'],
include_dirs=[
'/path/to/pto-isa/include',
'/path/to/torch_npu/include',
],
library_dirs=[
'/path/to/pto-isa/lib',
],
libraries=['pto'],
extra_compile_args=['-std=c++20', '-O3'],
)
],
cmdclass={'build_ext': BuildExtension}
)
编译:
python setup.py install
步骤5:Python 使用¶
import torch
import torch_npu
import my_pto_ops
# 创建输入
x = torch.randn(1024, 1024).npu()
y = torch.randn(1024, 1024).npu()
# 调用自定义算子
z = torch.ops.npu.my_add(x, y)
# 验证结果
expected = x + y
assert torch.allclose(z, expected, rtol=1e-5)
print("✓ Custom op works correctly!")
2.2 通过 torch.library 集成(PyTorch 2.0+)¶
更简洁的注册方式:
import torch
from torch.library import custom_op
@custom_op("mylib::my_add", mutates_args=())
def my_add(x: torch.Tensor, y: torch.Tensor) -> torch.Tensor:
"""自定义加法算子"""
return torch.ops.mylib.my_add_impl(x, y)
@my_add.register_fake
def _(x: torch.Tensor, y: torch.Tensor) -> torch.Tensor:
"""形状推导"""
assert x.shape == y.shape
return torch.empty_like(x)
# 使用
x = torch.randn(10, 10)
y = torch.randn(10, 10)
z = torch.ops.mylib.my_add(x, y)
2.3 完整示例:Add 算子¶
详细教程参考:demos/baseline/add/README_zh.md
3. TensorFlow 集成¶
3.1 自定义 Op¶
步骤1:定义 Op¶
// my_ops.cc
#include "tensorflow/core/framework/op.h"
#include "tensorflow/core/framework/shape_inference.h"
REGISTER_OP("MyAdd")
.Input("x: float")
.Input("y: float")
.Output("z: float")
.SetShapeFn([](::tensorflow::shape_inference::InferenceContext* c) {
// 形状推导
c->set_output(0, c->input(0));
return tensorflow::Status::OK();
})
.Doc(R"doc(
自定义加法算子
Args:
x: 第一个输入张量
y: 第二个输入张量
Returns:
z: x + y
)doc");
步骤2:实现 Kernel¶
#include "tensorflow/core/framework/op_kernel.h"
#include <pto/pto-inst.hpp>
class MyAddOp : public tensorflow::OpKernel {
public:
explicit MyAddOp(tensorflow::OpKernelConstruction* context)
: OpKernel(context) {}
void Compute(tensorflow::OpKernelContext* context) override {
// 获取输入
const tensorflow::Tensor& x = context->input(0);
const tensorflow::Tensor& y = context->input(1);
// 检查形状
OP_REQUIRES(context, x.shape() == y.shape(),
tensorflow::errors::InvalidArgument(
"Inputs must have same shape"));
// 分配输出
tensorflow::Tensor* z = nullptr;
OP_REQUIRES_OK(context, context->allocate_output(0, x.shape(), &z));
// 调用 PTO kernel
const float* x_ptr = x.flat<float>().data();
const float* y_ptr = y.flat<float>().data();
float* z_ptr = z->flat<float>().data();
uint32_t length = x.NumElements();
EXEC_KERNEL_CMD(MyAddKernel, 24, z_ptr, x_ptr, y_ptr, length);
}
};
// 注册 kernel
REGISTER_KERNEL_BUILDER(
Name("MyAdd").Device(tensorflow::DEVICE_NPU),
MyAddOp);
步骤3:编译¶
# 使用 TensorFlow 的编译工具
TF_CFLAGS=( $(python -c 'import tensorflow as tf; print(" ".join(tf.sysconfig.get_compile_flags()))') )
TF_LFLAGS=( $(python -c 'import tensorflow as tf; print(" ".join(tf.sysconfig.get_link_flags()))') )
g++ -std=c++17 -shared my_ops.cc -o my_ops.so \
${TF_CFLAGS[@]} ${TF_LFLAGS[@]} \
-I/path/to/pto-isa/include \
-L/path/to/pto-isa/lib -lpto \
-fPIC -O3
步骤4:Python 使用¶
import tensorflow as tf
# 加载自定义 op
my_ops = tf.load_op_library('./my_ops.so')
# 使用
x = tf.constant([[1.0, 2.0], [3.0, 4.0]])
y = tf.constant([[5.0, 6.0], [7.0, 8.0]])
z = my_ops.my_add(x, y)
print(z.numpy())
# [[6. 8.]
# [10. 12.]]
3.2 注册梯度¶
@tf.RegisterGradient("MyAdd")
def _my_add_grad(op, grad):
"""MyAdd 的梯度"""
return grad, grad # ∂z/∂x = 1, ∂z/∂y = 1
4. ONNX Runtime 集成¶
4.1 自定义 Execution Provider¶
步骤1:定义 Kernel¶
// my_onnx_ops.cc
#include "onnxruntime/core/framework/op_kernel.h"
class MyAddKernel : public onnxruntime::OpKernel {
public:
MyAddKernel(const onnxruntime::OpKernelInfo& info) : OpKernel(info) {}
onnxruntime::Status Compute(onnxruntime::OpKernelContext* context) const override {
// 获取输入
const onnxruntime::Tensor* X = context->Input<onnxruntime::Tensor>(0);
const onnxruntime::Tensor* Y = context->Input<onnxruntime::Tensor>(1);
// 分配输出
onnxruntime::Tensor* Z = context->Output(0, X->Shape());
// 调用 PTO kernel
const float* x_data = X->Data<float>();
const float* y_data = Y->Data<float>();
float* z_data = Z->MutableData<float>();
size_t length = X->Shape().Size();
EXEC_KERNEL_CMD(MyAddKernel, 24, z_data, x_data, y_data, length);
return onnxruntime::Status::OK();
}
};
步骤2:注册 Kernel¶
ONNX_OPERATOR_KERNEL_EX(
Add,
kOnnxDomain,
7, // opset version
kNpuExecutionProvider,
MyAddKernel);
步骤3:创建 Execution Provider¶
class NpuExecutionProvider : public onnxruntime::IExecutionProvider {
public:
NpuExecutionProvider() : IExecutionProvider(kNpuExecutionProvider) {}
std::vector<std::unique_ptr<onnxruntime::ComputeCapability>>
GetCapability(const onnxruntime::GraphViewer& graph,
const std::vector<const onnxruntime::KernelRegistry*>& registries) const override {
// 返回支持的算子
// ...
}
};
步骤4:Python 使用¶
import onnxruntime as ort
# 注册自定义 EP
session_options = ort.SessionOptions()
session_options.register_custom_ops_library('my_onnx_ops.so')
# 创建会话
session = ort.InferenceSession(
'model.onnx',
session_options,
providers=['NpuExecutionProvider', 'CPUExecutionProvider']
)
# 推理
outputs = session.run(None, {'input': input_data})
5. 推理框架集成¶
5.1 MindSpore Lite 集成¶
// 注册自定义算子
#include "include/registry/register_kernel.h"
class MyAddKernel : public mindspore::kernel::Kernel {
public:
int Prepare() override { return RET_OK; }
int Execute() override {
auto input0 = in_tensors_[0];
auto input1 = in_tensors_[1];
auto output = out_tensors_[0];
// 调用 PTO kernel
// ...
return RET_OK;
}
};
// 注册
REGISTER_CUSTOM_KERNEL(NPU, MyProvider, kNumberTypeFloat32, Add, MyAddKernel)
5.2 TensorRT 集成¶
// 自定义 Plugin
class MyAddPlugin : public nvinfer1::IPluginV2DynamicExt {
public:
int enqueue(const nvinfer1::PluginTensorDesc* inputDesc,
const nvinfer1::PluginTensorDesc* outputDesc,
const void* const* inputs,
void* const* outputs,
void* workspace,
cudaStream_t stream) noexcept override {
// 调用 PTO kernel
// ...
return 0;
}
};
// 注册
REGISTER_TENSORRT_PLUGIN(MyAddPluginCreator);
6. 性能优化¶
6.1 算子融合¶
# PyTorch 示例:融合 Add + ReLU
@torch.jit.script
def fused_add_relu(x: torch.Tensor, y: torch.Tensor) -> torch.Tensor:
return torch.relu(x + y)
# 使用自定义融合算子替换
torch.ops.npu.fused_add_relu(x, y)
6.2 内存优化¶
// Inplace 算子
at::Tensor& my_add_inplace(at::Tensor& x, const at::Tensor& y) {
// 直接修改 x,避免分配新内存
float* x_ptr = x.data_ptr<float>();
const float* y_ptr = y.data_ptr<float>();
uint32_t length = x.numel();
EXEC_KERNEL_CMD(MyAddInplaceKernel, 24, x_ptr, y_ptr, length);
return x;
}
6.3 异步执行¶
// 使用 CUDA Stream(或 NPU Stream)
at::Tensor my_add_async(const at::Tensor& x, const at::Tensor& y) {
at::Tensor out = at::empty_like(x);
// 获取当前 stream
auto stream = at::cuda::getCurrentCUDAStream();
// 异步启动 kernel
EXEC_KERNEL_ASYNC(MyAddKernel, 24, stream,
out.data_ptr<float>(),
x.data_ptr<float>(),
y.data_ptr<float>(),
x.numel());
return out;
}
7. 调试与测试¶
7.1 单元测试¶
import unittest
import torch
import my_pto_ops
class TestMyOps(unittest.TestCase):
def test_my_add(self):
x = torch.randn(100, 100).npu()
y = torch.randn(100, 100).npu()
# 自定义算子
z_custom = torch.ops.npu.my_add(x, y)
# 参考实现
z_ref = x + y
# 验证
self.assertTrue(torch.allclose(z_custom, z_ref, rtol=1e-5))
def test_my_add_backward(self):
x = torch.randn(100, 100, requires_grad=True).npu()
y = torch.randn(100, 100, requires_grad=True).npu()
z = torch.ops.npu.my_add(x, y)
loss = z.sum()
loss.backward()
# 验证梯度
self.assertIsNotNone(x.grad)
self.assertIsNotNone(y.grad)
self.assertTrue(torch.allclose(x.grad, torch.ones_like(x)))
if __name__ == '__main__':
unittest.main()
7.2 性能基准测试¶
import torch
import time
def benchmark(func, *args, warmup=10, iterations=100):
# 预热
for _ in range(warmup):
func(*args)
# 同步
torch.npu.synchronize()
# 测量
start = time.time()
for _ in range(iterations):
func(*args)
torch.npu.synchronize()
end = time.time()
avg_time = (end - start) / iterations * 1000 # ms
return avg_time
# 对比性能
x = torch.randn(1024, 1024).npu()
y = torch.randn(1024, 1024).npu()
time_custom = benchmark(lambda: torch.ops.npu.my_add(x, y))
time_builtin = benchmark(lambda: x + y)
print(f"Custom op: {time_custom:.3f} ms")
print(f"Built-in op: {time_builtin:.3f} ms")
print(f"Speedup: {time_builtin / time_custom:.2f}x")
8. 最佳实践¶
8.1 设计原则¶
✅ DO: - 保持算子接口简单清晰 - 提供完整的类型支持(float32, float16, int32 等) - 实现形状推导和类型推导 - 提供详细的文档和示例 - 编写完整的单元测试
❌ DON'T: - 不要在算子内部分配大量临时内存 - 不要假设输入总是连续的(使用 contiguous()) - 不要忽略边界情况(空张量、单元素张量) - 不要在算子内部使用全局状态
8.2 性能检查清单¶
- [ ] 算子是否支持 inplace 操作
- [ ] 是否实现了算子融合
- [ ] 是否使用了异步执行
- [ ] 是否避免了不必要的内存拷贝
- [ ] 是否支持多种数据类型
- [ ] 是否进行了性能基准测试
8.3 兼容性检查清单¶
- [ ] 是否支持动态形状
- [ ] 是否支持广播语义
- [ ] 是否支持梯度计算(训练)
- [ ] 是否支持 JIT 编译
- [ ] 是否支持导出为 ONNX
- [ ] 是否提供 CPU fallback