美文网首页
尚硅谷大数据技术之Flume

尚硅谷大数据技术之Flume

作者: 尚硅谷教育 | 来源:发表于2018-12-06 15:06 被阅读18次

5.4.4 MySQLSource
代码实现:
package com.atguigu.source;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

public class SQLSource extends AbstractSource implements Configurable, PollableSource {

//打印日志

private static final Logger LOG = LoggerFactory.getLogger(SQLSource.class);

//定义sqlHelper
private SQLSourceHelper sqlSourceHelper;


@Override
public long getBackOffSleepIncrement() {
    return 0;
}

@Override
public long getMaxBackOffSleepInterval() {
    return 0;
}

@Override

public void configure(Context context) {

    try {
        //初始化
        sqlSourceHelper = new SQLSourceHelper(context);
    } catch (ParseException e) {
        e.printStackTrace();
    }
}

@Override

public Status process() throws EventDeliveryException {

    try {
        //查询数据表
        List<List<Object>> result = sqlSourceHelper.executeQuery();

        //存放event的集合
        List<Event> events = new ArrayList<>();

        //存放event头集合
        HashMap<String, String> header = new HashMap<>();

        //如果有返回数据,则将数据封装为event
        if (!result.isEmpty()) {

            List<String> allRows = sqlSourceHelper.getAllRows(result);

            Event event = null;

            for (String row : allRows) {
                event = new SimpleEvent();
                event.setBody(row.getBytes());
                event.setHeaders(header);
                events.add(event);
            }

            //将event写入channel
            this.getChannelProcessor().processEventBatch(events);

            //更新数据表中的offset信息
            sqlSourceHelper.updateOffset2DB(result.size());
        }

        //等待时长
        Thread.sleep(sqlSourceHelper.getRunQueryDelay());

        return Status.READY;
    } catch (InterruptedException e) {
        LOG.error("Error procesing row", e);

        return Status.BACKOFF;
    }
}

@Override

public synchronized void stop() {

    LOG.info("Stopping sql source {} ...", getName());

    try {
        //关闭资源
        sqlSourceHelper.close();
    } finally {
        super.stop();
    }
}

}

本教程由尚硅谷教育大数据研究院出品,如需转载请注明来源,欢迎大家关注尚硅谷公众号(atguigu)了解更多。

相关文章

网友评论

      本文标题:尚硅谷大数据技术之Flume

      本文链接:https://www.haomeiwen.com/subject/eejocqtx.html