聊聊访问者模式与其在Calcite/Flink中的应用

前言

“访问者模式”在之前的文章里已经出现过几次了,择日不如撞日,今天接着聊吧。

The Visitor Pattern

访问者模式属于GoF设计模式分类中的行为型模式。它的定义如下:

Represents an operation to be performed on the elements of an object structure. Visitor lets you define a new operation without changing the classes of the elements on which it operates.
表示一个作用于某对象结构中的各元素的操作。它使你可以在不改变各元素类的前提下定义作用于这些元素的新操作

访问者模式的应用不甚广泛,但是却是特定场景下的银弹,即:对象的结构比较稳定,但经常需要对结构中的不同对象种类定义很多不同且不相关的操作。这样说可能不太形象,举个栗子。

一辆车由很多部件组成,如车身、引擎、轮子等。当你买到了一辆新车,肯定会先将它的各个部件仔细检视一遍,例如打开大灯、按一按喇叭等等。访问者模式就很适合描述检视新车的逻辑,因为车的部件虽然多但是相对固定,并且对于每个部件的操作不同。先贴出UML类图:

汽车元素可以抽象为接口CarElement,具体实现的元素类如WheelEngineBody。要使访问者发挥作用,它们内部必须定义一个接受访问者访问的方法accept(CarElementVisitor)CarElementVisitor接口则定义了汽车元素的访问者,其内部对不同类型的CarElement分别定义了重载的visit()方法。通过实现CarElementVisitor就可以实现不同的操作。具体代码如下:

interface CarElement {
    void accept(CarElementVisitor visitor);
}

interface CarElementVisitor {
    void visit(Body body);
    void visit(Car car);
    void visit(Engine engine);
    void visit(Wheel wheel);
}

class Wheel implements CarElement {
  private final String name;

  public Wheel(final String name) {
      this.name = name;
  }

  public String getName() {
      return name;
  }

  @Override
  public void accept(CarElementVisitor visitor) {
      visitor.visit(this);
  }
}

class Body implements CarElement {
  @Override
  public void accept(CarElementVisitor visitor) {
      visitor.visit(this);
  }
}

class Engine implements CarElement {
  @Override
  public void accept(CarElementVisitor visitor) {
      visitor.visit(this);
  }
}

class Car implements CarElement {
    private final List<CarElement> elements;

    public Car() {
        this.elements = List.of(
            new Wheel("front left"), new Wheel("front right"),
            new Wheel("back left"), new Wheel("back right"),
            new Body(), new Engine()
        );
    }

    @Override
    public void accept(CarElementVisitor visitor) {
        for (CarElement element : elements) {
            element.accept(visitor);
        }
        visitor.visit(this);
    }
}

class CarElementDoVisitor implements CarElementVisitor {
    @Override
    public void visit(Body body) {
        System.out.println("Moving my body");
    }

    @Override
    public void visit(Car car) {
        System.out.println("Starting my car");
    }

    @Override
    public void visit(Wheel wheel) {
        System.out.println("Kicking my " + wheel.getName() + " wheel");
    }

    @Override
    public void visit(Engine engine) {
        System.out.println("Starting my engine");
    }
}

class CarElementPrintVisitor implements CarElementVisitor {
    @Override
    public void visit(Body body) {
        System.out.println("Visiting body");
    }

    @Override
    public void visit(Car car) {
        System.out.println("Visiting car");
    }

    @Override
    public void visit(Engine engine) {
        System.out.println("Visiting engine");
    }

    @Override
    public void visit(Wheel wheel) {
        System.out.println("Visiting " + wheel.getName() + " wheel");
    }
}

public class VisitorDemo {
    public static void main(final String[] args) {
        Car car = new Car();

        car.accept(new CarElementPrintVisitor());
        car.accept(new CarElementDoVisitor());
    }
}

注意汽车本身Car也实现了CarElement,它是各个元素的复合对象,因此它的accept()方法内要遍历调用子元素的accept()方法。由于Java和其他多数高级语言支持动态分派(dynamic dispatch),故能够保证visitor.visit(this)语句在执行时找到正确的visit()方法的重载。

这段程序的输出是:

Visiting front left wheel
Visiting front right wheel
Visiting back left wheel
Visiting back right wheel
Visiting body
Visiting engine
Visiting car
Kicking my front left wheel
Kicking my front right wheel
Kicking my back left wheel
Kicking my back right wheel
Moving my body
Starting my engine
Starting my car

通过上面的例子,我们可以抽象出访问者模式的通用实现,如下图所示。

访问者模式的主要优点是符合单一职责原则,容易理解、易于扩展、方便解耦;主要缺点则是不符合依赖倒置原则,不适用于不稳定的结构,元素类发生变化时会导致大量修改。

Real-World Application

访问者模式在SQL化的数据处理引擎中相对比较常见。这里简单聊聊Calcite里的两个应用:RelVisitorRelShuttle

RelVisitor

顾名思义,RelVisitor就是访问RelNode的访问者。RelNode显然符合前文所述的对象结构稳定性和操作多变性的特征。但是RelVisitor的默认实现非常松散,全部的代码只有以下这点。

public abstract class RelVisitor {
  private RelNode root;

  public void visit(
      RelNode node,
      int ordinal,
      RelNode parent) {
    node.childrenAccept(this);
  }

  public void replaceRoot(RelNode node) {
    this.root = node;
  }

  public RelNode go(RelNode p) {
    this.root = p;
    visit(p, 0, null);
    return root;
  }
}

以及AbstractRelNode中定义的接受访问的方法childrenAccept()(accept()另有他用)。

public void childrenAccept(RelVisitor visitor) {
  List<RelNode> inputs = getInputs();
  for (int i = 0; i < inputs.size(); i++) {
    visitor.visit(inputs.get(i), i, this);
  }
}

这就意味着我们使用RelVisitor时要手动处理RelNode的具体实例类型。但由于Java支持instanceof运算符,Scala更支持多类型模式匹配,所以似乎也不算什么麻烦事。

举个简单的栗子,在Flink SQL产生的逻辑计划树中抽取数据流源表和维表源表。

class SourceTableRelVisitor extends RelVisitor {
  private val sourceStreamTables = new util.ArrayList[String]()
  private val lookupJoinTables = new util.ArrayList[String]()

  override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
    node match {
      case tableScanNode: TableScan =>
        val table = tableScanNode.getTable.getQualifiedName.mkString(".")
        if (parent != null && parent.isInstanceOf[Snapshot]) {
          lookupJoinTables.add(table)
        } else {
          sourceStreamTables.add(table)
        }
      case node: RelNode =>
        node.childrenAccept(this)
    }
  }
}

// new SourceTableRelVisitor().go(relNode)

RelShuttle

RelShuttle可以视为RelVisitor的加强版,其接口定义如下。

可见,它不仅像标准的访问者模式一样定义了不同类型的visit()方法,并且这些方法还可以返回新的RelNode,也就是能在访问的过程中做修改。来看一下默认的实现RelShuttleImpl就明白了。

public class RelShuttleImpl implements RelShuttle {
  protected final Deque<RelNode> stack = new ArrayDeque<>();

  protected RelNode visitChild(RelNode parent, int i, RelNode child) {
    stack.push(parent);
    try {
      // RelNode#accept()方法留给了RelShuttle
      RelNode child2 = child.accept(this);
      // 如果调用accept()方法之后,RelNode发生了变化...
      if (child2 != child) {
        final List<RelNode> newInputs = new ArrayList<>(parent.getInputs());
        // ...则需要修改父节点的inputs,并生成一个新副本
        newInputs.set(i, child2);
        return parent.copy(parent.getTraitSet(), newInputs);
      }
      return parent;
    } finally {
      stack.pop();
    }
  }

  protected RelNode visitChildren(RelNode rel) {
    for (Ord<RelNode> input : Ord.zip(rel.getInputs())) {
      rel = visitChild(rel, input.i, input.e);
    }
    return rel;
  }

  // 以下分别处理不同类型的RelNode
  public RelNode visit(LogicalAggregate aggregate) {
    return visitChild(aggregate, 0, aggregate.getInput());
  }

  public RelNode visit(LogicalMatch match) {
    return visitChild(match, 0, match.getInput());
  }

  // ... 略 ...

  public RelNode visit(RelNode other) {
    return visitChildren(other);
  }
}

仍然用Flink SQL来举例子,ExpandTableScanShuttle这个RelShuttle负责把视图(QueryOperationCatalogViewTable)和子查询(RexSubQuery)展开成正常的、包含底表TableScan节点的RelNode。其完整实现如下。

class ExpandTableScanShuttle extends RelShuttleImpl {
  override def visitChild(parent: RelNode, i: Int, child: RelNode): RelNode = {
    stack.push(parent)
    try {
      val child2 = child.accept(this)
      if (child2 ne child) {
        parent.replaceInput(i, child2)
      }
      parent
    } finally {
      stack.pop
    }
  }

  override def visit(filter: LogicalFilter): RelNode = {
    val newCondition = filter.getCondition.accept(new ExpandTableScanInSubQueryShuttle)
    if (newCondition ne filter.getCondition) {
      val newFilter = filter.copy(filter.getTraitSet, filter.getInput, newCondition)
      super.visit(newFilter)
    } else {
      super.visit(filter)
    }
  }

  override def visit(project: LogicalProject): RelNode = {
    val shuttle = new ExpandTableScanInSubQueryShuttle
    var changed = false
    val newProjects = project.getProjects.map {
      project =>
        val newProject = project.accept(shuttle)
        if (newProject ne project) {
          changed = true
        }
        newProject
    }
    if (changed) {
      val newProject = project.copy(
        project.getTraitSet, project.getInput, newProjects, project.getRowType)
      super.visit(newProject)
    } else {
      super.visit(project)
    }
  }

  override def visit(join: LogicalJoin): RelNode = {
    val newCondition = join.getCondition.accept(new ExpandTableScanInSubQueryShuttle)
    if (newCondition ne join.getCondition) {
      val newJoin = join.copy(
        join.getTraitSet, newCondition, join.getLeft, join.getRight,
        join.getJoinType, join.isSemiJoinDone)
      super.visit(newJoin)
    } else {
      super.visit(join)
    }
  }

  class ExpandTableScanInSubQueryShuttle extends RexShuttle {
    override def visitSubQuery(subQuery: RexSubQuery): RexNode = {
      val newRel = subQuery.rel.accept(ExpandTableScanShuttle.this)
      var changed = false
      val newOperands = subQuery.getOperands.map { op =>
        val newOp = op.accept(ExpandTableScanInSubQueryShuttle.this)
        if (op ne newOp) {
          changed = true
        }
        newOp
      }

      var newSubQuery = subQuery
      if (newRel ne newSubQuery.rel) {
        newSubQuery = newSubQuery.clone(newRel)
      }
      if (changed) {
        newSubQuery = newSubQuery.clone(newSubQuery.getType, newOperands)
      }
      newSubQuery
    }
  }

  override def visit(scan: TableScan): RelNode = {
    scan match {
      case tableScan: LogicalTableScan =>
        val viewTable = tableScan.getTable.unwrap(classOf[QueryOperationCatalogViewTable])
        if (viewTable != null) {
          val rel = viewTable.toRel(ViewExpanders.simpleContext(tableScan.getCluster))
          rel.accept(this)
        } else {
          tableScan
        }
      case otherScan => otherScan
    }
  }
}

时间已经比较晚了,不多说废话,提几个需要注意的点:

  • RexShuttle的含义与RelShuttle基本相同,只是处理的元素变成了RexNode
  • 这里的“子查询”RexSubQuery实际上指的是这些操作:IN / NOT IN / EXISTS / NOT EXISTS,以及标量子查询(即形如WHERE a = (SELECT a FROM t WHERE b = 'c')的子查询)。所以除了LogicalProject之外,还要额外处理LogicalFilterLogicalJoin的条件。
  • 表和视图的扫描操作都是用LogicalTableScan节点来表示的,但视图展开成底表之后必须保证LogicalTableScan的输出完全相同。所以还得重写visitChild()方法,在子节点发生变化时,只调用parent.replaceInput()替换掉对应的子节点,而不是调用copy()产生一个新节点。

The End

晚安。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>