多核并行编程

本文档介绍 PTO 的多核并行编程技术,帮助开发者充分利用 Ascend 多核架构实现高性能算子。

目录


1. 多核架构概述

1.1 Ascend 多核架构

硬件配置: - A2/A3:24 个 AI Core - A5:更多核心(具体数量依型号)

架构特点

┌─────────────────────────────────┐
│         Host CPU                │
└────────────┬────────────────────┘
             │
    ┌────────┴────────┐
    │   NPU Device    │
    │  ┌───┬───┬───┐  │
    │  │C0 │C1 │...│  │  AI Cores
    │  └───┴───┴───┘  │
    │  ┌───────────┐  │
    │  │    GM     │  │  Global Memory
    │  └───────────┘  │
    └─────────────────┘

核心特性: - 每个核心独立执行 - 共享全局内存(GM) - 独立的 L1 缓存 - 通过 GM 进行核间通信

1.2 并行编程模型

两种主要模式

模式 特点 适用场景
SPMD 所有核运行相同代码 规则的数据并行
MPMD 不同核运行不同代码 流水线、生产者-消费者

2. SPMD 编程模式

2.1 基本概念

SPMD (Single Program, Multiple Data): - 所有核心执行相同的程序 - 通过 block_idx 区分不同的数据块 - 最常用的并行模式

2.2 基础示例

向量加法

__global__ __aicore__ void VecAddKernel(__gm__ float* out,
                                        __gm__ const float* in0,
                                        __gm__ const float* in1,
                                        uint32_t totalLength) {
  // 获取当前核心 ID
  int block_idx = get_block_idx();
  int block_num = get_block_num();

  // 计算当前核心负责的数据范围
  int elements_per_block = (totalLength + block_num - 1) / block_num;
  int start = block_idx * elements_per_block;
  int end = min(start + elements_per_block, totalLength);

  // 处理当前块
  for (int i = start; i < end; i += TILE_SIZE) {
    int size = min(TILE_SIZE, end - i);

    using TileT = Tile<TileType::Vec, float, 8, 256>;
    TileT a, b, c;

    TLOAD(a, GlobalTensor(in0 + i));
    TLOAD(b, GlobalTensor(in1 + i));
    TADD(c, a, b);
    TSTORE(GlobalTensor(out + i), c);
  }
}

2.3 2D 数据划分

矩阵乘法示例

__global__ __aicore__ void MatMulKernel(__gm__ float* C,
                                        __gm__ const float* A,
                                        __gm__ const float* B,
                                        int M, int K, int N) {
  // 获取核心 ID
  int block_idx = get_block_idx();

  // 2D 划分:M 和 N 维度
  int blocks_m = (M + TILE_M - 1) / TILE_M;
  int blocks_n = (N + TILE_N - 1) / TILE_N;

  int block_m = block_idx / blocks_n;
  int block_n = block_idx % blocks_n;

  // 计算当前核心负责的矩阵块
  int m_start = block_m * TILE_M;
  int n_start = block_n * TILE_N;

  // 确保不越界
  if (m_start >= M || n_start >= N) return;

  int m_size = min(TILE_M, M - m_start);
  int n_size = min(TILE_N, N - n_start);

  // 执行矩阵乘法
  TileAcc acc;
  TFILL(acc, 0);

  for (int k = 0; k < K; k += TILE_K) {
    int k_size = min(TILE_K, K - k);

    TLOAD(tileA, A[m_start:m_start+m_size, k:k+k_size]);
    TLOAD(tileB, B[k:k+k_size, n_start:n_start+n_size]);
    TMATMUL_ACC(acc, tileA, tileB);
  }

  TSTORE(C[m_start:m_start+m_size, n_start:n_start+n_size], acc);
}

2.4 3D 数据划分

卷积示例

__global__ __aicore__ void ConvKernel(...) {
  int block_idx = get_block_idx();

  // 3D 划分:Batch, Height, Width
  int blocks_h = (H + TILE_H - 1) / TILE_H;
  int blocks_w = (W + TILE_W - 1) / TILE_W;

  int block_b = block_idx / (blocks_h * blocks_w);
  int block_h = (block_idx / blocks_w) % blocks_h;
  int block_w = block_idx % blocks_w;

  // 处理当前 3D 块
  process_conv_block(block_b, block_h, block_w);
}

3. MPMD 编程模式

3.1 基本概念

MPMD (Multiple Program, Multiple Data): - 不同核心执行不同的程序 - 适合流水线和生产者-消费者模式 - 需要核间同步

3.2 任务分派模式

方法1:单入口 + switch

__global__ __aicore__ void MPMDKernel(__gm__ float* out,
                                      __gm__ const float* in,
                                      uint32_t task_id) {
  switch (task_id) {
    case 0:
      ProducerTask(out, in);
      break;
    case 1:
      ConsumerTask(out, in);
      break;
    case 2:
      ProcessorTask(out, in);
      break;
    default:
      break;
  }
}

方法2:多入口

// 生产者 kernel
__global__ __aicore__ void ProducerKernel(...) {
  // 生产数据
  for (int i = 0; i < N; i++) {
    produce_data(buffer[i]);
    signal_consumer();  // 通知消费者
  }
}

// 消费者 kernel
__global__ __aicore__ void ConsumerKernel(...) {
  // 消费数据
  for (int i = 0; i < N; i++) {
    wait_producer();  // 等待生产者
    consume_data(buffer[i]);
  }
}

3.3 流水线模式

三阶段流水线

__global__ __aicore__ void PipelineKernel(__gm__ float* out,
                                          __gm__ const float* in,
                                          uint32_t stage_id) {
  switch (stage_id) {
    case 0:  // Stage 1: Load
      for (int i = 0; i < N; i++) {
        TLOAD(buffer1[i], in[i]);
        signal_stage2();
      }
      break;

    case 1:  // Stage 2: Compute
      for (int i = 0; i < N; i++) {
        wait_stage1();
        TCOMPUTE(buffer2[i], buffer1[i]);
        signal_stage3();
      }
      break;

    case 2:  // Stage 3: Store
      for (int i = 0; i < N; i++) {
        wait_stage2();
        TSTORE(out[i], buffer2[i]);
      }
      break;
  }
}

4. 负载均衡

4.1 静态负载均衡

均匀划分

// 方法1:简单均分
int elements_per_block = totalLength / block_num;
int start = block_idx * elements_per_block;
int end = (block_idx == block_num - 1) ? 
          totalLength : start + elements_per_block;

// 方法2:向上取整均分
int elements_per_block = (totalLength + block_num - 1) / block_num;
int start = block_idx * elements_per_block;
int end = min(start + elements_per_block, totalLength);

2D 均匀划分

// 计算最优的 2D 划分
int blocks_m = (int)sqrt(block_num * M / N);
int blocks_n = block_num / blocks_m;

// 调整以充分利用所有核心
while (blocks_m * blocks_n < block_num && blocks_n < N / TILE_N) {
  blocks_n++;
  blocks_m = block_num / blocks_n;
}

4.2 动态负载均衡

工作窃取模式

// 使用原子操作实现动态任务分配
__gm__ atomic<int> next_task = 0;

__global__ __aicore__ void DynamicKernel(...) {
  while (true) {
    // 原子获取下一个任务
    int task_id = next_task.fetch_add(1);

    if (task_id >= total_tasks) break;

    // 处理任务
    process_task(task_id);
  }
}

4.3 负载不均衡检测

检测方法

// 记录每个核心的执行时间
#ifdef PROFILE
  auto start = GetTime();

  // 执行任务
  process_block(block_idx);

  auto end = GetTime();
  execution_times[block_idx] = end - start;
#endif

// 分析负载均衡性
float max_time = *max_element(execution_times.begin(), 
                              execution_times.end());
float min_time = *min_element(execution_times.begin(), 
                              execution_times.end());
float imbalance = (max_time - min_time) / max_time;

if (imbalance > 0.2) {
  printf("Warning: Load imbalance detected: %.2f%%\n", 
         imbalance * 100);
}

5. 核间通信

5.1 通过全局内存通信

基本模式

// 核心 0:写入数据
__global__ __aicore__ void Writer(__gm__ float* shared_buffer) {
  if (get_block_idx() == 0) {
    TLOAD(tile, local_data);
    TSTORE(shared_buffer, tile);
    // 设置标志表示数据已就绪
    shared_buffer[FLAG_OFFSET] = 1;
  }
}

// 核心 1:读取数据
__global__ __aicore__ void Reader(__gm__ float* shared_buffer) {
  if (get_block_idx() == 1) {
    // 等待数据就绪
    while (shared_buffer[FLAG_OFFSET] != 1) {
      // 自旋等待
    }
    TLOAD(tile, shared_buffer);
    process(tile);
  }
}

5.2 使用原子操作同步

计数器同步

__gm__ atomic<int> counter = 0;

__global__ __aicore__ void SyncKernel(...) {
  // 每个核心完成工作后增加计数器
  process_local_work();

  counter.fetch_add(1);

  // 等待所有核心完成
  while (counter.load() < block_num) {
    // 自旋等待
  }

  // 继续下一阶段
  next_stage_work();
}

5.3 屏障同步

软件屏障

class Barrier {
  __gm__ atomic<int> counter;
  __gm__ atomic<int> generation;
  int num_threads;

public:
  void wait() {
    int gen = generation.load();

    if (counter.fetch_add(1) == num_threads - 1) {
      // 最后一个到达的线程
      counter.store(0);
      generation.fetch_add(1);
    } else {
      // 等待所有线程到达
      while (generation.load() == gen) {
        // 自旋等待
      }
    }
  }
};

__global__ __aicore__ void BarrierKernel(...) {
  Barrier barrier(block_num);

  // 阶段 1
  phase1_work();
  barrier.wait();

  // 阶段 2
  phase2_work();
  barrier.wait();

  // 阶段 3
  phase3_work();
}

6. 性能优化

6.1 减少核间通信

策略1:增大数据块

// 不好:频繁通信
for (int i = 0; i < N; i++) {
  process_small_block(i);
  sync_with_other_cores();  // 每次都同步
}

// 好:批量处理
for (int i = 0; i < N; i += BATCH_SIZE) {
  process_large_block(i, BATCH_SIZE);
  sync_with_other_cores();  // 批量同步
}

策略2:本地化计算

// 尽量让每个核心独立完成工作
__global__ __aicore__ void LocalizedKernel(...) {
  int block_idx = get_block_idx();

  // 每个核心处理完整的子问题
  // 无需与其他核心通信
  process_independent_subproblem(block_idx);
}

6.2 优化数据划分

考虑数据局部性

// 2D 矩阵:按块划分而非按行/列
// 好:每个核心访问连续的内存块
for (int bm = 0; bm < blocks_m; bm++) {
  for (int bn = 0; bn < blocks_n; bn++) {
    int block_id = bm * blocks_n + bn;
    if (block_id == get_block_idx()) {
      process_block(bm, bn);
    }
  }
}

6.3 避免伪共享

问题

// 不好:多个核心写入相邻位置
__gm__ float results[NUM_CORES];

__global__ __aicore__ void BadKernel(...) {
  int idx = get_block_idx();
  results[idx] = compute();  // 可能导致缓存行冲突
}

解决方案

// 好:使用 padding 避免伪共享
constexpr int CACHE_LINE_SIZE = 64;
constexpr int PADDING = CACHE_LINE_SIZE / sizeof(float);

__gm__ float results[NUM_CORES * PADDING];

__global__ __aicore__ void GoodKernel(...) {
  int idx = get_block_idx();
  results[idx * PADDING] = compute();  // 避免缓存行冲突
}

6.4 性能测量

测量核心利用率

#ifdef PROFILE
  __gm__ uint64_t start_times[NUM_CORES];
  __gm__ uint64_t end_times[NUM_CORES];

  __global__ __aicore__ void ProfileKernel(...) {
    int idx = get_block_idx();

    start_times[idx] = GetCycles();

    // 执行工作
    do_work();

    end_times[idx] = GetCycles();
  }

  // 分析结果
  uint64_t max_time = 0;
  uint64_t total_time = 0;

  for (int i = 0; i < NUM_CORES; i++) {
    uint64_t time = end_times[i] - start_times[i];
    max_time = max(max_time, time);
    total_time += time;
  }

  float efficiency = (float)total_time / (NUM_CORES * max_time);
  printf("Core efficiency: %.2f%%\n", efficiency * 100);
#endif

7. 最佳实践

7.1 选择合适的并行模式

决策树

数据可以均匀划分?
├─ 是 → 使用 SPMD
│   └─ 数据是规则的矩阵/张量?
│       ├─ 是 → 2D/3D 划分
│       └─ 否 → 1D 划分
└─ 否 → 考虑 MPMD 或动态负载均衡
    └─ 有明显的流水线阶段?
        ├─ 是 → 使用 MPMD 流水线
        └─ 否 → 使用动态任务分配

7.2 优化检查清单

并行设计: - [ ] 选择了合适的并行模式(SPMD/MPMD) - [ ] 数据划分均匀 - [ ] 考虑了数据局部性 - [ ] 最小化核间通信

负载均衡: - [ ] 每个核心的工作量相近 - [ ] 处理了边界情况 - [ ] 测量了核心利用率

同步优化: - [ ] 只在必要时同步 - [ ] 使用细粒度同步 - [ ] 避免死锁

性能验证: - [ ] 测量了并行加速比 - [ ] 分析了核心利用率 - [ ] 识别了性能瓶颈


8. 实战案例

案例1:GEMM 多核优化

2D 划分策略

// 24 核:4×6 划分
constexpr int BLOCKS_M = 4;
constexpr int BLOCKS_N = 6;

__global__ __aicore__ void GEMMKernel(...) {
  int block_idx = get_block_idx();
  int block_m = block_idx / BLOCKS_N;
  int block_n = block_idx % BLOCKS_N;

  // 每个核心处理 M/4 × N/6 的块
  int m_start = block_m * (M / BLOCKS_M);
  int n_start = block_n * (N / BLOCKS_N);

  // 执行局部 GEMM
  local_gemm(m_start, n_start, ...);
}

性能结果: - 单核:50 TFLOPS - 24 核:1100 TFLOPS - 加速比:22× (效率 92%)

案例2:Flash Attention 多核

序列维度划分

__global__ __aicore__ void FlashAttnKernel(...) {
  int block_idx = get_block_idx();
  int seq_per_block = (SEQ_LEN + block_num - 1) / block_num;

  int seq_start = block_idx * seq_per_block;
  int seq_end = min(seq_start + seq_per_block, SEQ_LEN);

  // 每个核心处理一段序列
  // 无需核间通信
  for (int i = seq_start; i < seq_end; i += TILE_SIZE) {
    process_attention_block(i);
  }
}

参考资源