欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

动态代码生成技术在 Presto 中使用简介

《ASM 与 Presto 动态代码生成简介》这篇文章中,我们简单介绍了 Presto 动态代码生成的原理以及 Presto 在计算表达式的地方会使用到动态代码生成技术。为了加深理解,本文将以两个例子介绍 Presto 里面动态代码生成的使用。

EmbedVersion

我们往 Presto 提交 SQL 查询以及 TaskExecutor 启动 TaskRunner 执行 Task 的时候都会使用到 EmbedVersion 类里面的 embedVersion 方法。embedVersion 方法其实就是初始化一个 Runnable 实例,比如启动 TaskRunner 的代码片段如下:

executor.execute(embedVersion.embedVersion(new TaskRunner()));

其中 TaskRunner 就是实现 Runnable 接口的。EmbedVersion 的 embedVersion 方法实现如下:

public Runnable embedVersion(Runnable runnable)
{
    requireNonNull(runnable, "runnable is null");
    try {
        return (Runnable) runnableConstructor.invoke(runnable);
    }
    catch (Throwable throwable) {
        throwIfUnchecked(throwable);
        throw new RuntimeException(throwable);
    }
}

其中 runnableConstructor 就是使用 ASM 进行代码生成的类,实现如下:

// 这里定义了一个类,类名大概为 Presto_null__testversion____20211011_105831_1,
// 它的父类是 Object,并实现了 Runnable 接口。
ClassDefinition classDefinition = new ClassDefinition(
    a(PUBLIC, FINAL),
    makeClassName(baseClassName(serverConfig)),
    type(Object.class),
    type(Runnable.class));

// 定义了一个名为 runnable 的局部变量,类型为 Runnable
FieldDefinition field = classDefinition.declareField(a(PRIVATE), "runnable", Runnable.class);

Parameter parameter = arg("runnable", type(Runnable.class));
// 定义了这个类的构造函数,参数为 runnable,参数类型为 Runnable
MethodDefinition constructor = classDefinition.declareConstructor(a(PUBLIC), parameter);
// 构造方法里面其实就是把参数 runnable 的值赋值给局部变量 runnable
constructor.getBody()
    .comment("super(runnable);")
    .append(constructor.getThis())
    .invokeConstructor(Object.class)
    .append(constructor.getThis())
    .append(parameter)
    .putField(field)
    .ret();

// 定义了一个名为 run 的方法,事实上就是实现 Runnable 接口里面的 run 方法
MethodDefinition run = classDefinition.declareMethod(a(PUBLIC), "run", type(void.class));
// run 里面其实就是调用局部变量 runnable 的 run 方法
run.getBody()
    .comment("runnable.run();")
    .append(run.getThis())
    .getField(field)
    .invokeInterface(Runnable.class, "run", void.class)
    .ret();

// 定义这个类,并加载到 ClassLoader 中
Class<? extends Runnable> generatedClass = defineClass(classDefinition, Runnable.class, ImmutableMap.of(), getClass().getClassLoader());
this.runnableConstructor = constructorMethodHandle(generatedClass, Runnable.class);

上面是 Presto 操作 Java 字节码并动态生成了一个类,其生成的类大概如下面所示:

package com.facebook.presto.$gen;

public final class Presto_null__testversion____20211011_105831_1 implements Runnable {
    private Runnable runnable;

    public Presto_null__testversion____20211011_105831_1(Runnable runnable) {
        this.runnable = runnable;
    }

    public void run() {
        this.runnable.run();
    }
}

看起来内容其实很简单。EmbedVersion 类算是 Presto 里面动态代码生成最简单的例子了。

concat 函数实现

下面我们来看下稍微复杂的,也就是 Presto 里面内置函数的实现。Presto 的内置函数的实现很多也是用到代码生成技术,比如 map_filter、transform_keys 以及 transform_values 等。我们这里也举一个比较简单的例子,也就是 concat 函数的实现。比如下面的 SQL 查询:

select concat(o_orderstatus, o_orderpriority) from orders limit 10;

在 Presto 里面,concat 函数的实现就是通过代码生成进行的,其实现代码可以参见 com.facebook.presto.operator.scalar.ConcatFunction。Presto 接收到上面的 SQL 查询后,会在 Coordinator 端进行解析,并生成相应的 Tasks,提交给 Worker 执行。在 Worker 端,执行 Task 的时候,会调用 LocalExecutionPlanner 的 plan 方法生成 LocalExecutionPlan 其实就是本地可执行的计划,在 plan 方法里面会调用 com.facebook.presto.sql.planner.LocalExecutionPlanner.Visitor 对 Coordinator 传过来的 PlanNode 进行变量生成 PhysicalOperation。在我们的例子中,会在 com.facebook.presto.sql.planner.LocalExecutionPlanner.Visitor#visitScanFilterAndProject 里面对 concat(o_orderstatus, o_orderpriority) 进行代码生成,最终调用到 com.facebook.presto.operator.scalar.ConcatFunction 的 generateConcat 方法,其就是 Presto 的 concat 函数实现逻辑,如下:

// arity 代表 Concat 函数输入参数的个数
private static Class<?> generateConcat(TypeSignature type, int arity)
{
	checkCondition(arity <= 254, NOT_SUPPORTED, "Too many arguments for string concatenation");
	// 定义动态代码生成的类名,生成的类名大概是 varchar_concat2ScalarFunction_20211011_062900_3 样子的
	ClassDefinition definition = new ClassDefinition(
		a(PUBLIC, FINAL),
		makeClassName(type.getBase() + "_concat" + arity + "ScalarFunction"),
		type(Object.class));

	// 生成类的构造函数,这里是使用 private 修饰的
	// Generate constructor
	definition.declareDefaultConstructor(a(PRIVATE));

	// Generate concat()

	// 定义 concat 函数的参数,比如 arg0、arg1;类型是 Slice
	List<Parameter> parameters = IntStream.range(0, arity)
		.mapToObj(i -> arg("arg" + i, Slice.class))
		.collect(toImmutableList());

	// 定义一个名为 concat 的函数,它的修饰符是 public static,
	// 返回类型是 Slice,输入参数是上面定义的 arg0、arg1 等。
	MethodDefinition method = definition.declareMethod(a(PUBLIC, STATIC), "concat", type(Slice.class), parameters);
	Scope scope = method.getScope();
	BytecodeBlock body = method.getBody();

    // 定义一个名为 length 的局部变量,类型为 int
	Variable length = scope.declareVariable(int.class, "length");
	// length 变量初始化为0
	body.append(length.set(constantInt(0)));

	// 下面是计算 concat 函数每个参数的长度(其实就是调用 string 的 length 方法)
	// 然后再把得到的字符串长度加到 length 里面,并赋值给 length
	for (int i = 0; i < arity; ++i) {
		body.append(length.set(generateCheckedAdd(length, parameters.get(i).invoke("length", int.class))));
	}

	// 定义一个名为 result 的局部变量,类型为 Slice
	Variable result = scope.declareVariable(Slice.class, "result");
	// 调用 Slices 的 allocate 方法分配出长度为 length 空间的 Slice 对象,并赋值给 result
	body.append(result.set(invokeStatic(Slices.class, "allocate", Slice.class, length)));

	// 定义一个名为 position 的局部变量,类型为 int,赋值为 0
	Variable position = scope.declareVariable(int.class, "position");
	body.append(position.set(constantInt(0)));

	// 下面是循环调用 result 的 setBytes 方法,并分别把 arg0、arg1 里面的内容放到 result 里面去
	// 最后计算 arg0 或 arg1 字符串长度再加上 position 的值,结果再赋值给 position
	for (int i = 0; i < arity; ++i) {
		body.append(result.invoke("setBytes", void.class, position, parameters.get(i)));
		body.append(position.set(add(position, parameters.get(i).invoke("length", int.class))));
	}

	// 返回 result
	body.getVariable(result)
		.retObject();

	// 定义生成的类,并把它加载打破 DynamicClassLoader 里面去
	return defineClass(definition, Object.class, ImmutableMap.of(), new DynamicClassLoader(ConcatFunction.class.getClassLoader()));
}

private static BytecodeExpression generateCheckedAdd(BytecodeExpression x, BytecodeExpression y)
{
    // 调用 ConcatFunction 类里面的 checkedAdd 静态方法
    return invokeStatic(ConcatFunction.class, "checkedAdd", int.class, x, y);
}

@UsedByGeneratedCode
public static int checkedAdd(int x, int y)
{
    try {
        return addExact(x, y);
    }
    catch (ArithmeticException e) {
        throw new PrestoException(INVALID_FUNCTION_ARGUMENT, "Concatenated string is too large");
    }
}

为了方便理解,我对 generateConcat 方法的实现进行了注释,应该很好理解。为了性能问题,最终生成的函数会进行缓存,下一次再调用 concat 函数,只要函数签名一样,就不用再一次进行 concat 代码的生成。比如我们前面的例子是对两个字符串进行合并(函数签名为 presto.default.concat(varchar,varchar):varchar ),如果下一次还是调用这个函数就不用再进行代码生成了。但是如果下一次是对三个字符串进行合并,还是要进行一次代码生成的。

到这里,大家可能还是不太明白 Presto 代码生成到底生成了什么东西。这里我就进一步介绍一下。如果运行我们上面的 SQL 查询,Presto 生成的 concat 实现大概如下面所示:


package com.facebook.presto.$gen;

import com.facebook.presto.operator.scalar.ConcatFunction;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;

public final class varchar_concat2ScalarFunction_20211011_062900_3 {
    private varchar_concat2ScalarFunction_20211011_062900_3() {
    }

    public static Slice concat(Slice arg0, Slice arg1) {
        int length = 0;
        int length = ConcatFunction.checkedAdd(length, arg0.length());
        length = ConcatFunction.checkedAdd(length, arg1.length());
        Slice result = Slices.allocate(length);
        int position = 0;
        result.setBytes(position, arg0);
        int position = position + arg0.length();
        result.setBytes(position, arg1);
        int var10000 = position + arg1.length();
        return result;
    }
}

注意,Presto 里面生成的是 Java 字节码,这里只是为了说明的方便,给出了 Java 源代码。可以看到,最终生成的代码其实很好理解。Presto 里面对两个字符串进行 concat 其实就是执行上面的代码片段。

总结

本文通过两个例子简单的介绍了 Presto 里面动态代码生成的用法,通过上面两个例子,相信大家应该能够大概了解 Presto 里面的动态代码生成的技术和用法。后面有空我会介绍一下表达式这块代码生成和整个 page 处理逻辑是怎么联系到一起的,敬请关注。

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【动态代码生成技术在 Presto 中使用简介】(https://www.iteblog.com/archives/10047.html)
喜欢 (1)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!