博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink Table的select操作
阅读量:5834 次
发布时间:2019-06-18

本文共 16011 字,大约阅读时间需要 53 分钟。

本文主要研究一下flink Table的select操作

Table.select

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

class Table(    private[flink] val tableEnv: TableEnvironment,    private[flink] val logicalPlan: LogicalNode) {  //......  def select(fields: String): Table = {    val fieldExprs = ExpressionParser.parseExpressionList(fields)    //get the correct expression for AggFunctionCall    val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, tableEnv))    select(withResolvedAggFunctionCall: _*)  }  def replaceAggFunctionCall(field: Expression, tableEnv: TableEnvironment): Expression = {    field match {      case l: LeafExpression => l      case u: UnaryExpression =>        val c = replaceAggFunctionCall(u.child, tableEnv)        u.makeCopy(Array(c))      case b: BinaryExpression =>        val l = replaceAggFunctionCall(b.left, tableEnv)        val r = replaceAggFunctionCall(b.right, tableEnv)        b.makeCopy(Array(l, r))      // Functions calls      case c @ Call(name, args) =>        val function = tableEnv.getFunctionCatalog.lookupFunction(name, args)        function match {          case a: AggFunctionCall => a          case a: Aggregation => a          case p: AbstractWindowProperty => p          case _ =>            val newArgs =              args.map(                (exp: Expression) =>                  replaceAggFunctionCall(exp, tableEnv))            c.makeCopy(Array(name, newArgs))        }      // Scala functions      case sfc @ ScalarFunctionCall(clazz, args) =>        val newArgs: Seq[Expression] =          args.map(            (exp: Expression) =>              replaceAggFunctionCall(exp, tableEnv))        sfc.makeCopy(Array(clazz, newArgs))      // Array constructor      case c @ ArrayConstructor(args) =>        val newArgs =          c.elements            .map((exp: Expression) => replaceAggFunctionCall(exp, tableEnv))        c.makeCopy(Array(newArgs))      // Other expressions      case e: Expression => e    }  }  def select(fields: Expression*): Table = {    val expandedFields = expandProjectList(fields, logicalPlan, tableEnv)    val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, tableEnv)    if (propNames.nonEmpty) {      throw new ValidationException("Window properties can only be used on windowed tables.")    }    if (aggNames.nonEmpty) {      val projectsOnAgg = replaceAggregationsAndProperties(        expandedFields, tableEnv, aggNames, propNames)      val projectFields = extractFieldReferences(expandedFields)      new Table(tableEnv,        Project(projectsOnAgg,          Aggregate(Nil, aggNames.map(a => Alias(a._1, a._2)).toSeq,            Project(projectFields, logicalPlan).validate(tableEnv)          ).validate(tableEnv)        ).validate(tableEnv)      )    } else {      new Table(tableEnv,        Project(expandedFields.map(UnresolvedAlias), logicalPlan).validate(tableEnv))    }  }  //......}
  • Table提供了两个select方法,一个接收String参数,一个接收Expression参数
  • String参数的select内部先调用ExpressionParser.parseExpressionList解析String,之后再通过replaceAggFunctionCall替换UDAGG function,最后再调用Expression参数的select方法
  • Expression参数的select方法会使用Project重新创建Table,如果有aggregate的话,会创建Aggregate,然后再通过Project包装

Expression

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/expressions/Expression.scala

abstract class Expression extends TreeNode[Expression] {  /**    * Returns the [[TypeInformation]] for evaluating this expression.    * It is sometimes not available until the expression is valid.    */  private[flink] def resultType: TypeInformation[_]  /**    * One pass validation of the expression tree in post order.    */  private[flink] lazy val valid: Boolean = childrenValid && validateInput().isSuccess  private[flink] def childrenValid: Boolean = children.forall(_.valid)  /**    * Check input data types, inputs number or other properties specified by this expression.    * Return `ValidationSuccess` if it pass the check,    * or `ValidationFailure` with supplement message explaining the error.    * Note: we should only call this method until `childrenValid == true`    */  private[flink] def validateInput(): ValidationResult = ValidationSuccess  /**    * Convert Expression to its counterpart in Calcite, i.e. RexNode    */  private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =    throw new UnsupportedOperationException(      s"${this.getClass.getName} cannot be transformed to RexNode"    )  private[flink] def checkEquals(other: Expression): Boolean = {    if (this.getClass != other.getClass) {      false    } else {      def checkEquality(elements1: Seq[Any], elements2: Seq[Any]): Boolean = {        elements1.length == elements2.length && elements1.zip(elements2).forall {          case (e1: Expression, e2: Expression) => e1.checkEquals(e2)          case (t1: Seq[_], t2: Seq[_]) => checkEquality(t1, t2)          case (i1, i2) => i1 == i2        }      }      val elements1 = this.productIterator.toSeq      val elements2 = other.productIterator.toSeq      checkEquality(elements1, elements2)    }  }}abstract class BinaryExpression extends Expression {  private[flink] def left: Expression  private[flink] def right: Expression  private[flink] def children = Seq(left, right)}abstract class UnaryExpression extends Expression {  private[flink] def child: Expression  private[flink] def children = Seq(child)}abstract class LeafExpression extends Expression {  private[flink] val children = Nil}
  • Expression继承了TreeNode,它有三个抽象子类分别是BinaryExpression、UnaryExpression、LeafExpression

Project

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

case class Project(    projectList: Seq[NamedExpression],    child: LogicalNode,    explicitAlias: Boolean = false)  extends UnaryNode {  override def output: Seq[Attribute] = projectList.map(_.toAttribute)  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {    val afterResolve = super.resolveExpressions(tableEnv).asInstanceOf[Project]    val newProjectList =      afterResolve.projectList.zipWithIndex.map { case (e, i) =>        e match {          case u @ UnresolvedAlias(c) => c match {            case ne: NamedExpression => ne            case expr if !expr.valid => u            case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"${ne.name}-$tp")            case gcf: GetCompositeField => Alias(gcf, gcf.aliasName().getOrElse(s"_c$i"))            case other => Alias(other, s"_c$i")          }          case _ =>            throw new RuntimeException("This should never be called and probably points to a bug.")        }    }    Project(newProjectList, child, explicitAlias)  }  override def validate(tableEnv: TableEnvironment): LogicalNode = {    val resolvedProject = super.validate(tableEnv).asInstanceOf[Project]    val names: mutable.Set[String] = mutable.Set()    def checkName(name: String): Unit = {      if (names.contains(name)) {        failValidation(s"Duplicate field name $name.")      } else {        names.add(name)      }    }    resolvedProject.projectList.foreach {      case n: Alias =>        // explicit name        checkName(n.name)      case r: ResolvedFieldReference =>        // simple field forwarding        checkName(r.name)      case _ => // Do nothing    }    resolvedProject  }  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {    child.construct(relBuilder)    val exprs = if (explicitAlias) {      projectList    } else {      // remove AS expressions, according to Calcite they should not be in a final RexNode      projectList.map {        case Alias(e: Expression, _, _) => e        case e: Expression => e      }    }    relBuilder.project(      exprs.map(_.toRexNode(relBuilder)).asJava,      projectList.map(_.name).asJava,      true)  }}
  • Project继承了UnaryNode,它构造器接收Seq[NamedExpression]、LogicalNode、explicitAlias三个参数,其中explicitAlias可选,默认为false

Aggregate

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

case class Aggregate(    groupingExpressions: Seq[Expression],    aggregateExpressions: Seq[NamedExpression],    child: LogicalNode) extends UnaryNode {  override def output: Seq[Attribute] = {    (groupingExpressions ++ aggregateExpressions) map {      case ne: NamedExpression => ne.toAttribute      case e => Alias(e, e.toString).toAttribute    }  }  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {    child.construct(relBuilder)    relBuilder.aggregate(      relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava),      aggregateExpressions.map {        case Alias(agg: Aggregation, name, _) => agg.toAggCall(name)(relBuilder)        case _ => throw new RuntimeException("This should never happen.")      }.asJava)  }  override def validate(tableEnv: TableEnvironment): LogicalNode = {    implicit val relBuilder: RelBuilder = tableEnv.getRelBuilder    val resolvedAggregate = super.validate(tableEnv).asInstanceOf[Aggregate]    val groupingExprs = resolvedAggregate.groupingExpressions    val aggregateExprs = resolvedAggregate.aggregateExpressions    aggregateExprs.foreach(validateAggregateExpression)    groupingExprs.foreach(validateGroupingExpression)    def validateAggregateExpression(expr: Expression): Unit = expr match {      case distinctExpr: DistinctAgg =>        distinctExpr.child match {          case _: DistinctAgg => failValidation(            "Chained distinct operators are not supported!")          case aggExpr: Aggregation => validateAggregateExpression(aggExpr)          case _ => failValidation(            "Distinct operator can only be applied to aggregation expressions!")        }      // check aggregate function      case aggExpr: Aggregation        if aggExpr.getSqlAggFunction.requiresOver =>        failValidation(s"OVER clause is necessary for window functions: [${aggExpr.getClass}].")      // check no nested aggregation exists.      case aggExpr: Aggregation =>        aggExpr.children.foreach { child =>          child.preOrderVisit {            case agg: Aggregation =>              failValidation(                "It's not allowed to use an aggregate function as " +                  "input of another aggregate function")            case _ => // OK          }        }      case a: Attribute if !groupingExprs.exists(_.checkEquals(a)) =>        failValidation(          s"expression '$a' is invalid because it is neither" +            " present in group by nor an aggregate function")      case e if groupingExprs.exists(_.checkEquals(e)) => // OK      case e => e.children.foreach(validateAggregateExpression)    }    def validateGroupingExpression(expr: Expression): Unit = {      if (!expr.resultType.isKeyType) {        failValidation(          s"expression $expr cannot be used as a grouping expression " +            "because it's not a valid key type which must be hashable and comparable")      }    }    resolvedAggregate  }}
  • Aggregate继承了UnaryNode,它构造器接收Seq[Expression]、Seq[NamedExpression]、LogicalNode三个参数

LogicalNode

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/LogicalNode.scala

abstract class LogicalNode extends TreeNode[LogicalNode] {  def output: Seq[Attribute]  def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {    // resolve references and function calls    val exprResolved = expressionPostOrderTransform {      case u @ UnresolvedFieldReference(name) =>        // try resolve a field        resolveReference(tableEnv, name).getOrElse(u)      case c @ Call(name, children) if c.childrenValid =>        tableEnv.getFunctionCatalog.lookupFunction(name, children)    }    exprResolved.expressionPostOrderTransform {      case ips: InputTypeSpec if ips.childrenValid =>        var changed: Boolean = false        val newChildren = ips.expectedTypes.zip(ips.children).map { case (tpe, child) =>          val childType = child.resultType          if (childType != tpe && TypeCoercion.canSafelyCast(childType, tpe)) {            changed = true            Cast(child, tpe)          } else {            child          }        }.toArray[AnyRef]        if (changed) ips.makeCopy(newChildren) else ips    }  }  final def toRelNode(relBuilder: RelBuilder): RelNode = construct(relBuilder).build()  protected[logical] def construct(relBuilder: RelBuilder): RelBuilder  def validate(tableEnv: TableEnvironment): LogicalNode = {    val resolvedNode = resolveExpressions(tableEnv)    resolvedNode.expressionPostOrderTransform {      case a: Attribute if !a.valid =>        val from = children.flatMap(_.output).map(_.name).mkString(", ")        // give helpful error message for null literals        if (a.name == "null") {          failValidation(s"Cannot resolve field [${a.name}] given input [$from]. If you want to " +            s"express a null literal, use 'Null(TYPE)' for typed null expressions. " +            s"For example: Null(INT)")        } else {          failValidation(s"Cannot resolve field [${a.name}] given input [$from].")        }      case e: Expression if e.validateInput().isFailure =>        failValidation(s"Expression $e failed on input check: " +          s"${e.validateInput().asInstanceOf[ValidationFailure].message}")    }  }  /**    * Resolves the given strings to a [[NamedExpression]] using the input from all child    * nodes of this LogicalPlan.    */  def resolveReference(tableEnv: TableEnvironment, name: String): Option[NamedExpression] = {    // try to resolve a field    val childrenOutput = children.flatMap(_.output)    val fieldCandidates = childrenOutput.filter(_.name.equalsIgnoreCase(name))    if (fieldCandidates.length > 1) {      failValidation(s"Reference $name is ambiguous.")    } else if (fieldCandidates.nonEmpty) {      return Some(fieldCandidates.head.withName(name))    }    // try to resolve a table    tableEnv.scanInternal(Array(name)) match {      case Some(table) => Some(TableReference(name, table))      case None => None    }  }  /**    * Runs [[postOrderTransform]] with `rule` on all expressions present in this logical node.    *    * @param rule the rule to be applied to every expression in this logical node.    */  def expressionPostOrderTransform(rule: PartialFunction[Expression, Expression]): LogicalNode = {    var changed = false    def expressionPostOrderTransform(e: Expression): Expression = {      val newExpr = e.postOrderTransform(rule)      if (newExpr.fastEquals(e)) {        e      } else {        changed = true        newExpr      }    }    val newArgs = productIterator.map {      case e: Expression => expressionPostOrderTransform(e)      case Some(e: Expression) => Some(expressionPostOrderTransform(e))      case seq: Traversable[_] => seq.map {        case e: Expression => expressionPostOrderTransform(e)        case other => other      }      case r: Resolvable[_] => r.resolveExpressions(e => expressionPostOrderTransform(e))      case other: AnyRef => other    }.toArray    if (changed) makeCopy(newArgs) else this  }  protected def failValidation(msg: String): Nothing = {    throw new ValidationException(msg)  }}abstract class LeafNode extends LogicalNode {  override def children: Seq[LogicalNode] = Nil}abstract class UnaryNode extends LogicalNode {  def child: LogicalNode  override def children: Seq[LogicalNode] = child :: Nil}abstract class BinaryNode extends LogicalNode {  def left: LogicalNode  def right: LogicalNode  override def children: Seq[LogicalNode] = left :: right :: Nil}
  • LogicalNode跟Expression一样,也继承了TreeNode;LogicalNode有三个抽象子类,分别是BinaryNode、UnaryNode、LeafNode

小结

  • Table提供了两个select方法,一个接收String参数,一个接收Expression参数;String参数的select内部先调用ExpressionParser.parseExpressionList解析String,之后再通过replaceAggFunctionCall替换UDAGG function,最后再调用Expression参数的select方法
  • Expression参数的select方法会使用Project重新创建Table,如果有aggregate的话,会创建Aggregate,然后在通过Project包装
  • Project及Aggregate都是case class,它们都继承了UnaryNode,UnaryNode是LogicalNode的子类;LogicalNode跟Expression一样,也继承了TreeNode;Expression有三个抽象子类分别是BinaryExpression、UnaryExpression、LeafExpression;LogicalNode也有三个抽象子类,分别是BinaryNode、UnaryNode、LeafNode

doc

转载地址:http://tkycx.baihongyu.com/

你可能感兴趣的文章
PHP程序员7小时学会Kotlin系列 - 第一小时 背景
查看>>
实验二. 使用LoadRunner进行压力测试
查看>>
对iOS10新增Api的详细探究
查看>>
wstring to wchar_t*
查看>>
Tomcat
查看>>
RHEL7搭建DHCP
查看>>
jquery在线教程
查看>>
MAC Java 开发环境配置
查看>>
寻找一些渲染方面的书单
查看>>
【javascript】利用 a 标签自动解析 url
查看>>
rem 自适应
查看>>
Shell 编程入门
查看>>
集成报错
查看>>
SQL Server在更改计算机名后的设置
查看>>
RPC框架性能基本比较测试
查看>>
Java-jdbc连接简化类jdbcUtil
查看>>
angularjs drag and drop
查看>>
【python进阶】详解元类及其应用1
查看>>
ASP.NET Core 2.0 : 五.服务是如何加载并运行的, Kestrel、配置与环境
查看>>
jQuery CDN - provided by (mt) Media Temple
查看>>