美文网首页
Apache Phoenix(三)新特性之用户定义函数UDFS(

Apache Phoenix(三)新特性之用户定义函数UDFS(

作者: 我知他风雨兼程途径日暮不赏 | 来源:发表于2020-02-10 13:02 被阅读0次

首先我们编写个简单的反转类,进行demo演示。

1. 创建类

  创建一个ScalarFunction的派生类,叫做UserDefineFunction。它需要做的事就是字符串反转。

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.function.ScalarFunction;
import org.apache.phoenix.parse.FunctionParseNode;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PVarchar;

import java.sql.SQLException;
import java.util.List;

/**
 * @author Lczy-Huang
 * @classname UserDefineFunction
 * @desc TODO
 * @date 2020-02-07 15:15
 **/
public class UserDefineFunction extends ScalarFunction {
    public UserDefineFunction() {
    }

    public UserDefineFunction(List<Expression> children) throws SQLException {
        super(children);
    }


    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
        return false;
    }

    @Override
    public String getName() {
        // TODO Auto-generated method stub
        return NAME;
    }
}

2.加上注解

  添加对应的注解在UserDefineFunction指定对应名称,期望参数和它的数据类型。在如下的例子中,是一个简单的varchar类型参数,char类型是可以隐式转varchar的。

/**
 * @author Lczy-Huang
 * @classname UserDefineFunction
 * @desc TODO
 * @date 2020-02-07 15:15
 **/
@FunctionParseNode.BuiltInFunction(name= UserDefineFunction.NAME,  args={
        @FunctionParseNode.Argument(allowedTypes={PVarchar.class})} )
public class UserDefineFunction extends ScalarFunction {
    public UserDefineFunction() {
    }

    public UserDefineFunction(List<Expression> children) throws SQLException {
        super(children);
    }


    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
        return false;
    }

    @Override
    public String getName() {
        // TODO Auto-generated method stub
        return NAME;
    }
}

3. 实现getDataType方法

  getDataType方法决定数据返回类型的函数。

/**
 * @author Lczy-Huang
 * @classname UserDefineFunction
 * @desc TODO
 * @date 2020-02-07 15:15
 **/
@FunctionParseNode.BuiltInFunction(name= UserDefineFunction.NAME,  args={
        @FunctionParseNode.Argument(allowedTypes={PVarchar.class})} )
public class UserDefineFunction extends ScalarFunction {
    public UserDefineFunction() {
    }

    public UserDefineFunction(List<Expression> children) throws SQLException {
        super(children);
    }


    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
        return false;
    }

    @Override
    public String getName() {
        // TODO Auto-generated method stub
        return NAME;
    }

    public PDataType getDataType() {
        // TODO Auto-generated method stub
        return PVarchar.INSTANCE;
    }
}

4.编写evaluate函数

  实现evaluate方法,系统会调用该方法去计算每一行的结果。这个方法被传递一个Tuple元组,该元组具有当前行的状态和一个ImmutableBytesWritable对象,指向需要填充该函数执行的结果。这个方法如果返回fasle,代表现有信息不足以计算出可用的结果(通常情况下是因为其中的某个参数是未知的),如果true则相反。

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.function.ScalarFunction;
import org.apache.phoenix.parse.FunctionParseNode;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PVarchar;

import java.sql.SQLException;
import java.util.List;

/**
 * @author Lczy-Huang
 * @classname UserDefineFunction
 * @desc TODO
 * @date 2020-02-07 15:15
 **/
@FunctionParseNode.BuiltInFunction(name= UserDefineFunction.NAME,  args={
        @FunctionParseNode.Argument(allowedTypes={PVarchar.class})} )
public class UserDefineFunction extends ScalarFunction {
    // 反转字符串函数
    public static final String NAME = "REVERSE_STR1";

    public UserDefineFunction() {
    }

    public UserDefineFunction(List<Expression> children) throws SQLException {
        super(children);
    }


    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
        // 获取子参数,首先对它评估值
        Expression arg = getChildren().get(0);
        if(!arg.evaluate(tuple,ptr)){
            return false;
        }
        if (ptr.getLength() == 0) {
            return true;
        }
        // 注意不能直接通过ptr.get()获得byte[]
        // 获取到的数组是ascii编码,所以转成string需要ascii解码
        byte[] bytes = ptr.copyBytes();
        String sourceStr = asciiToString(bytes);
        if(sourceStr==null){
            return true;
        }
        StringBuilder builder = new StringBuilder(sourceStr);
        ptr.set(builder.reverse().toString().getBytes());
        return true;
    }


    public PDataType getDataType() {
        // TODO Auto-generated method stub
        return PVarchar.INSTANCE;
    }

    @Override
    public String getName() {
        // TODO Auto-generated method stub
        return NAME;
    }


    public  String asciiToString(byte[] bytes){
        StringBuilder stringBuilder = new StringBuilder();
        for(int i=0;i<bytes.length;i++){
            stringBuilder.append((char)bytes[i]);
        }
        return stringBuilder.toString();
    }
}

5.打包上传运行结果

  将编写完的代码打包上传至hdfs上的hbase.dynamic.jars.dir下。

[root@master ~]# hdfs dfs -put phoenix_demo.jar  /hbase/lib/phoenix_demo.0.15.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slfaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/tez/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBin
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

  登陆phoenix上,执行创建函数命令基础格式如下:
CREATE FUNCTION 【函数名】(【输入类型】) RETURNS 【输出类型】 as '【具体的类路径】' using jar 【对应jar包的HDFS路径】

0: jdbc:phoenix:master:2181> CREATE FUNCTION REVERSE_21(VARCHAR) returns varchar as 'UserDefineFunction09' using jar 'hdfs://master:9820/hbase/lib/phoenix_demo.0.15.jar'

注意:版本更新最好将类路径和函数名进行更改,可能出现缓存,导致新更新的不生效。

6.运行截图

image.png

相关文章

网友评论

      本文标题:Apache Phoenix(三)新特性之用户定义函数UDFS(

      本文链接:https://www.haomeiwen.com/subject/zefaxhtx.html