# 一个算子在深度学习框架中的旅程

``````import oneflow as torch
class TinyModel(torch.nn.Module):

def __init__(self):
super(TinyModel, self).__init__()

self.linear1 = torch.nn.Linear(100, 200)
self.activation = torch.nn.ReLU()
self.linear2 = torch.nn.Linear(200, 10)
self.softmax = torch.nn.Softmax()

def forward(self, x):
x = self.linear1(x)
x = self.activation(x)
x = self.linear2(x)
x = self.softmax(x)
return xtinymodel = TinyModel()print('The model:')print(tinymodel)``````

``output = flow.relu(input)``
``为例，梳理一个op从Python -> C++执行的完整过程。``

1

Binding

• oneflow/api/python/framework/tensor.cpp

• oneflow/api/python/framework/tensor_functions.cpp

• oneflow/core/functional/functional_api.yaml

• https://docs.python.org/zh-cn/3.8/c-api/index.html

``python/oneflow/__init__.py``

``````- name: "relu"
signature: "Tensor (Tensor x, Bool inplace=False) => Relu"
bind_python: True``````

``tools/functional/generate_functional_api.py``

• build/oneflow/core/functional/functional_api.yaml.h

• build/oneflow/core/functional/functional_api.yaml.cpp

``````class ReluFunctor {
public:
ReluFunctor() { op_ = CHECK_JUST(one::OpBuilder("relu").Input("x", 1).Output("y", 1).Build()); }
Maybe<Tensor> operator()(const std::shared_ptr<Tensor>& x, bool inplace) const {
...
}

private:
std::shared_ptr<OpExpr> op_;
};``````

ReluFunctor通过

``````ONEFLOW_FUNCTION_LIBRARY(m) {
...
}``````

## 2

Functor

Functor不仅是Python -> C++交互的核心，也是op调用、输入参数推导和检查的第一站。通常，各种op在functor层需要完成对输入tensor的shape、dtype、维度、元素个数等各种check，以及对op特有的逻辑进行解析和处理。Relu Functor代码如下：

``````class ReluFunctor {
public:
ReluFunctor() { op_ = CHECK_JUST(one::OpBuilder("relu").Input("x", 1).Output("y", 1).Build()); }
Maybe<Tensor> operator()(const std::shared_ptr<Tensor>& x, bool inplace) const {
if (inplace) {
JUST(CheckInplaceValid(x));
std::shared_ptr<TensorTuple> outputs = std::make_shared<TensorTuple>(1);
outputs->at(0) = x;
JUST(OpInterpUtil::Dispatch(*op_, {x}, outputs.get(), AttrMap{}));
return outputs->at(0);
} else {
return OpInterpUtil::Dispatch<Tensor>(*op_, {x});
}
}

private:
std::shared_ptr<OpExpr> op_;
};``````

``std::shared_ptr<OpExpr> op_;``

## Dispatch

``````class OpInterpUtil {
public:
template<typename T>
static Maybe<T> Dispatch(const OpExpr& op_expr, const TensorTuple& inputs, const AttrMap& attrs) {
return Dispatch<T>(op_expr, inputs, OpExprInterpContext(attrs));
}

template<typename T>
static Maybe<T> Dispatch(const OpExpr& op_expr, const TensorTuple& inputs) {
return Dispatch<T>(op_expr, inputs, OpExprInterpContext(AttrMap{}));
}

template<typename T>
static Maybe<T> Dispatch(const OpExpr& op_expr, const TensorTuple& inputs,
const OpExprInterpContext& ctx);

static Maybe<void> Dispatch(const OpExpr& op_expr, const TensorTuple& inputs,
TensorTuple* outputs, const AttrMap& attrs) {
return Dispatch(op_expr, inputs, outputs, OpExprInterpContext(attrs));
}

static Maybe<void> Dispatch(const OpExpr& op_expr, const TensorTuple& inputs,
TensorTuple* outputs) {
return Dispatch(op_expr, inputs, outputs, OpExprInterpContext(AttrMap{}));
}

static Maybe<void> Dispatch(const OpExpr& op_expr, const TensorTuple& inputs,
TensorTuple* outputs, const OpExprInterpContext& ctx);``````

``````/* static */ Maybe<void> OpInterpUtil::Dispatch(
const OpExpr& op_expr,
const TensorTuple& inputs,
TensorTuple* outputs,
const OpExprInterpContext& ctx) {
return JUST(GetInterpreter(inputs, ctx, op_expr))->Apply(op_expr, inputs, outputs, ctx);
}``````

Dispatch至此，剩下的就要交给Interpreter了。

## Get Interpreter

``````Maybe<AutogradInterpreter> GetInterpreter(const TensorTuple& inputs, const OpExprInterpContext& ctx,
const OpExpr& op_expr) {
static const auto& g_lazy_interpreter = BuildLazyInterpreter();
static const auto& g_eager_consistent_interpreter = BuildEagerInterpreter(/*is_mirrored=*/false);
static const auto& g_eager_mirrored_interpreter = BuildEagerInterpreter(/*is_mirrored=*/true);
if (!LazyMode::is_enabled()) {
if (inputs.empty()) {
if (ctx.parallel_desc.has_value()) {
JUST(ctx.nd_sbp);
CHECK_OR_RETURN(!ctx.device.has_value());
return g_eager_consistent_interpreter;
} else {
CHECK_OR_RETURN(!ctx.nd_sbp.has_value());
return g_eager_mirrored_interpreter;
}
} else {
if (inputs.at(0)->is_consistent()) {
...
return g_eager_consistent_interpreter;
} else {
...
return g_eager_mirrored_interpreter;
}
}
UNIMPLEMENTED_THEN_RETURN();
}
return g_lazy_interpreter;
}``````

• EagerMirroredInterpreter

• EagerConsistentInterpreter

• LazyInterpreter

``````std::shared_ptr<AutogradInterpreter> BuildEagerInterpreter(const bool& is_mirrored) {
std::shared_ptr<OpExprInterpreter> internal;
if (is_mirrored) {
internal = std::make_shared<EagerMirroredInterpreter>();
} else {
internal = std::make_shared<EagerConsistentInterpreter>();
}
}

auto internal = std::make_shared<LazyInterpreter>();
}``````

``````class AutogradInterpreter {
public:
AutogradInterpreter(const std::shared_ptr<OpExprInterpreter>& internal) : internal_(internal) {}

Maybe<void> Apply(const OpExpr& op_expr, const TensorTuple& inputs, TensorTuple* outputs,
const AttrMap& attrs) const {
return Apply(op_expr, inputs, outputs, OpExprInterpContext(attrs));
}

Maybe<void> Apply(const OpExpr& op_expr, const TensorTuple& inputs, TensorTuple* outputs) const {
return Apply(op_expr, inputs, outputs, OpExprInterpContext(AttrMap{}));
}

Maybe<void> Apply(const OpExpr& op_expr, const TensorTuple& inputs, TensorTuple* outputs,
const OpExprInterpContext& ctx) const;

private:
std::shared_ptr<OpExprInterpreter> internal_;
};``````

Apply()

``````Maybe<void> AutogradInterpreter::Apply(const OpExpr& op_expr, const TensorTuple& inputs,
TensorTuple* outputs, const OpExprInterpContext& ctx) const {
// any of input tensors requires_grad==True，则表示需要计算梯度
std::any_of(inputs.begin(), inputs.end(),
[](const std::shared_ptr<Tensor>& tensor) { return tensor->requires_grad(); });
}
// 这一坨逻辑比较丑陋，是因为近期支持了oneflow系统中支持了stride&&view机制
// 而大部分op尚未注册stride推导、尚未支持non-contiguous的输入tensor
// 所以需要在这对这部分op的输入进行强制转换，将其变为contiguous的
// NOTE: if this op not support stride, then need to tensor->contiguous()
#define HANDLE_NON_CONTIGUOUS_INPUT(tensor_tuple_ptr)
TensorTuple tmp_inputs;
if (!LazyMode::is_enabled() && !JUST(op_expr.SupportNonContiguous())) {
tmp_inputs.resize(inputs.size());
for (size_t i = 0; i < inputs.size(); i++) { tmp_inputs[i] = inputs[i]->contiguous(); }
tensor_tuple_ptr = &tmp_inputs;
}

const TensorTuple* inputs_ptr = &inputs;
HANDLE_NON_CONTIGUOUS_INPUT(inputs_ptr);

// 这里是进行实际Interpreter执行的主要过程
{
JUST(internal_->Apply(op_expr, *inputs_ptr, outputs, ctx));
}

// Lazy mode will construct backward compute graph in passes, so disable autograd if lazy mode.
auto backward_fn = std::make_shared<BackwardFunction>();
bool create_graph) -> Maybe<void> {
return Maybe<void>::Ok();
};
backward_fn->status = [=]() { return grad_closure->state()->SavedTensors().size() > 0; };
*inputs_ptr, outputs));
}
// Note: if requires_grad is True, we will create a new autograd meta for each output
// in `AddBackwardFuncPtr` to support inplace operation, so the update should after
for (auto& output : *outputs) {
...
}
}
// 捕获前向的inputs outputs，反向计算时可能用到
// Capture inputs and outputs after `AddBackwardFuncPtr` because of that grad function
// node has been attached to them.
}
return Maybe<void>::Ok();
}``````

``````// 这里是进行实际Interpreter执行的主要过程
{
JUST(internal_->Apply(op_expr, *inputs_ptr, outputs, ctx));
}``````

``````Maybe<void> EagerInterpreter::Apply(const OpExpr& op_expr, const TensorTuple& inputs,
TensorTuple* outputs, const OpExprInterpContext& ctx) const {
#define APPLY_IF(op_type)
if (const auto* op = dynamic_cast<const op_type##Expr*>(&op_expr)) {
return ApplyImpl(*op, inputs, outputs, ctx);
}

APPLY_IF(UserOp);
APPLY_IF(VariableOp);
APPLY_IF(CastToMirroredOp);
APPLY_IF(CastFromMirroredOp);
APPLY_IF(ConsistentToConsistentOp);
APPLY_IF(CastToConsistentOp);
APPLY_IF(CastFromConsistentOp);
APPLY_IF(DistributeSplitOp);
APPLY_IF(DistributeCloneOp);
APPLY_IF(DistributeConcatOp);
APPLY_IF(FunctionOp);
APPLY_IF(SelectTopNOp)
#undef APPLY_IF

OF_UNIMPLEMENTED() << "The type " << op_expr.op_type_name()
<< " has not been supported in EagerInterpreter::Apply.";
}``````

``````if (const auto* op = dynamic_cast<const UserOpExpr*>(&op_expr)) {
return ApplyImpl(*op, inputs, outputs, ctx);
}``````

oneflow/core/framework/op_interpreter/eager_mirrored_op_interpreter.cpp

``````Maybe<void> EagerMirroredInterpreter::ApplyImpl(const UserOpExpr& op_expr,
const TensorTuple& inputs, TensorTuple* outputs,
const OpExprInterpContext& ctx) const {
return NaiveInterpret(op_expr, inputs, outputs, ctx);
}``````

NaiveInterpret

NaiveInterpret简单来说，主要用于做以下几件事：

• check input tensor的device是否一致

• 生成output tensor

• 为output tensor推导和检查shape/stride/dtype

• 构建op执行指令，并派发至vm

``````Maybe<void> NaiveInterpret(const UserOpExpr& user_op_expr, const TensorTuple& inputs,
const Symbol<Device>& default_device, TensorTuple* outputs,
const OpExprInterpContext& ctx) {
const auto& attrs = ctx.attrs;
std::shared_ptr<EagerBlobObjectList> input_eager_blob_objects =
std::make_shared<EagerBlobObjectList>(inputs.size());
// check devices
for (int i = 0; i < inputs.size(); i++) {
const auto& input_device = JUST(inputs.at(i)->device());
if (i > 0) {
CHECK_OR_RETURN(*default_device == *input_device)
<< Error::RuntimeError()
<< "Expected all tensors to be on the same device, but found at least two devices, "
<< default_device->ToString() << " (positional 0) and " << input_device->ToString()
<< " (positional " << i << ")!";
}
input_eager_blob_objects->at(i) = JUST(inputs.at(i)->eager_blob_object());
}

// make output tensors
std::shared_ptr<EagerBlobObjectList> output_eager_blob_objects =
std::make_shared<EagerBlobObjectList>(outputs->size());
for (int i = 0; i < outputs->size(); i++) {
if (!outputs->at(i)) {
const auto& tensor_impl = std::make_shared<EagerMirroredTensorImpl>();
outputs->at(i) = std::make_shared<MirroredTensor>(tensor_impl);
output_tensor_metas->at(i) = tensor_impl->mut_tensor_meta();
} else {
bool has_eager_blob_object = JUST(outputs->at(i)->has_eager_blob_object());
CHECK_OR_RETURN(has_eager_blob_object);
output_eager_blob_objects->at(i) = JUST(outputs->at(i)->eager_blob_object());
}
}
Symbol<Stream> stream;
bool need_check_mem_case = true;

// Infer devices
...

// Infer shapes strides dtype
...

// 构建op执行指令，并派发至vm
JUST(PhysicalRun([&](InstructionsBuilder* builder) -> Maybe<void> {
return builder->LocalCallOpKernel(kernel, input_eager_blob_objects, output_eager_blob_objects,
ctx, stream);
}));
return Maybe<void>::Ok();
}``````

Interpreter的终点是虚拟机（vm）。vm部分，是OneFlow比较独特的设计，内容很多，这里暂不展开了：） 可以简单理解，派发至vm后，此op将进入一个任务执行的队列，将会等待其vm的调度、执行。

## Compute

``oneflow/core/eager/opkernel_instruction_type.cpp``

``````static inline void OpKernelCompute(
LocalCallOpKernelPhyInstrOperand* operand,
DeviceCtx* device_ctx, user_op::OpKernelState* state,
const user_op::OpKernelCache* cache) {

auto* opkernel = operand->mut_opkernel();
auto* compute_ctx =
opkernel->UpdateComputeContext(operand->inputs().get(), operand->outputs().get(),
operand->consistent_tensor_infer_result().get(), device_ctx);
...
operand->user_opkernel()->Compute(compute_ctx, state, cache);
opkernel->UpdateComputeContext(nullptr, nullptr, nullptr, nullptr);
}``````

``operand->user_opkernel()->Compute(compute_ctx, state, cache);``

``oneflow/user/kernels/xxx_kernel.cpp``

``oneflow/user/kernels/xxx_kernel.cu``

### UnaryPrimitiveKernel

``````class UnaryPrimitiveKernel final : public user_op::OpKernel, public user_op::CudaGraphSupport {
public:
OF_DISALLOW_COPY_AND_MOVE(UnaryPrimitiveKernel);
UnaryPrimitiveKernel() = default;
~UnaryPrimitiveKernel() = default;

using PrimitiveFactoryFuncType = std::function<std::unique_ptr<ep::primitive::ElementwiseUnary>(
user_op::KernelComputeContext*)>;

UnaryPrimitiveKernel(const std::string& output_name, const std::string& input_name,
PrimitiveFactoryFuncType fn)
: output_name_(output_name),
input_name_(input_name),
primitive_factory_func_(std::move(fn)) {}

private:
using user_op::OpKernel::Compute;
void Compute(user_op::KernelComputeContext* ctx) const override {
auto primitive = primitive_factory_func_(ctx);
CHECK(primitive);

const user_op::Tensor* input_tensor = ctx->Tensor4ArgNameAndIndex(input_name_, 0);
...
const int64_t elem_cnt = input_shape.elem_cnt();

if (elem_cnt != 0) {
primitive->Launch(ctx->stream(), input_tensor->dptr(), output_tensor->mut_dptr(), elem_cnt);
}
}
bool AlwaysComputeWhenAllOutputsEmpty() const override { return false; }

std::string output_name_;
std::string input_name_;
PrimitiveFactoryFuncType primitive_factory_func_;
};``````

### ep::primitive::ElementwiseUnary

``````template<UnaryOp unary_op, typename Src, typename Dst>
class ElementwiseUnaryImpl : public ElementwiseUnary {
public:
OF_DISALLOW_COPY_AND_MOVE(ElementwiseUnaryImpl);
ElementwiseUnaryImpl(Scalar attr0, Scalar attr1) : attr0(attr0), attr1(attr1) {}
~ElementwiseUnaryImpl() override = default;

void Launch(Stream* stream, const void* src_ptr, void* dst_ptr, size_t count) override {
CpuStream* cpu_stream = stream->As<CpuStream>();

Dst* dst = reinterpret_cast<Dst*>(dst_ptr);
const Src* src = reinterpret_cast<const Src*>(src_ptr);
auto functor = UnaryFunctor<DeviceType::kCPU, unary_op, Dst, Src>(attr0, attr1);
cpu_stream->ParallelFor(0, count, [functor, src, dst](int64_t begin, int64_t end) {
for (int64_t i = begin; i < end; i++) { dst[i] = functor(src[i]); }
});
}

protected:
Scalar attr0, attr1;
};``````

### 这个UnaryFuntor根据不同的Unaray op类型，特化出不同的具体functor实现，具体到Relu op，其实现位于

oneflow/core/ep/common/primitive/unary_functor.h：

``````template<DeviceType device, typename Dst, typename Src>
struct UnaryFunctor<device, UnaryOp::kRelu, Dst, Src> {
UnaryFunctor(Scalar attr0, Scalar attr1) {}

OF_DEVICE_FUNC Dst operator()(Src src) const {
const Src zero_val = static_cast<Src>(0.0);
if (src <= zero_val) {
return static_cast<Dst>(zero_val);
} else {
return static_cast<Dst>(src);
}
}
};``````

（参考代码：

https://github.com/Oneflow-Inc/oneflow/commit/1dbdf8faed988fa7fd1a9034a4d79d5caf18512d）

THE END