美文网首页
flink读取写入RocketMQ(String)

flink读取写入RocketMQ(String)

作者: 安全的小飞飞 | 来源:发表于2020-04-07 15:42 被阅读0次

之前在网上找了个例子:https://www.freesion.com/article/53297012/ 写的挺好。
github已经有开源地址RocketMQ-Flink。(读取可以根据github和这个例子写)

上面那个例子写了String的反序列化类SimpleStringDeserializationSchema。没有写对应的序列化类。

1、今天写项目就恰好用到了。记录下:

import org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema;

import java.nio.charset.StandardCharsets;

public class SimpleStringSerializationSchema implements KeyValueSerializationSchema<String> {
    private static final long serialVersionUID = 609645086382422113L;

    @Override
    public byte[] serializeKey(String key) {
        return null;
    }

    @Override
    public byte[] serializeValue(String value) {
        return value != null ? value.getBytes(StandardCharsets.UTF_8) : null;
    }
}

注意:serialVersionUID 这个是用idea生成的。在自己包下重新生成下,以免有问题。(方法:https://blog.csdn.net/ShotMoon/article/details/80496407

2、写入MQ:

  Properties props1 = new Properties();
  props1.setProperty(RocketMQConfig.NAME_SERVER_ADDR, Config.getMQServer());
  props1.setProperty(RocketMQConfig.PRODUCER_GROUP,Config.getProduceGroup());
  DefaultTopicSelector selector = new DefaultTopicSelector(Config.getTopic(),Config.getTagName());

 KeyValueSerializationSchema schema1 = new SimpleStringSerializationSchema();
 result.addSink(new RocketMQSink(schema1, selector, props1));

具体参数按照自己的写就行。

完美。

相关文章

网友评论

      本文标题:flink读取写入RocketMQ(String)

      本文链接:https://www.haomeiwen.com/subject/bldnphtx.html