场景
一般情况下 我们通过flume采集程序 把数据写入到另一个kafka或者es当中,但是这次我们有一个需求,是把数据进行解析之后写入到图数据库neo4j中,但是flume不支持这种sink,所以需要我们自定义sink来实现
实现
配置文件
a1.sources=r1
a1.channels=c1
a1.sinks=k1 k2
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 500
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 127.0.0.1:9092
a1.sources.r1.kafka.topics = a-topic,b-topic
a1.sources.r1.kafka.consumer.group.id = graph
a1.sinks.k1.type = com.jd.sink.CsaNeo4jSink
a1.sinks.k1.graph.name = preattackgraph
a1.sinks.k1.graph.batchsize = 500
a1.sinks.k1.graph.jdclass = 2,3,8,29,30,33,37
a1.sinks.k1.graph.neo4j.url = bolt://127.0.0.1:7687
a1.sinks.k1.graph.neo4j.username = neo4j
a1.sinks.k1.graph.neo4j.password = neo4j
a1.sinks.k1.graph.redis.maxactive = 100
a1.sinks.k1.graph.redis.maxidel = 10
a1.sinks.k1.graph.redis.maxwait = 2000
a1.sinks.k1.graph.redis.ip = 127.0.0.1
a1.sinks.k1.graph.redis.port = 6379
a1.sinks.k1.graph.redis.password = root
a1.sinks.k1.graph.redis.expired = 300
a1.sinks.k2.type = com.jd.sink.CsaNeo4jSink
a1.sinks.k2.graph.batchsize = 500
a1.sinks.k2.graph.name = preattackgraph
a1.sinks.k2.graph.neo4j.url = bolt://127.0.0.1:7687
a1.sinks.k2.graph.neo4j.username = neo4j
a1.sinks.k2.graph.neo4j.password = neo4j
a1.sinks.k2.graph.redis.maxactive = 100
a1.sinks.k2.graph.redis.maxidel = 10
a1.sinks.k2.graph.redis.maxwait = 2000
a1.sinks.k2.graph.redis.ip = 127.0.0.1
a1.sinks.k2.graph.redis.port = 6379
a1.sinks.k2.graph.redis.password = root
a1.sinks.k2.graph.redis.expired = 300
a1.channels.c1.type=memory
a1.channels.c1.keep-alive=60
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=10000
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1
自定义抽象的neo4jsink
public abstract class Neo4jSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(Neo4jSink.class);
public static final String CSA_GRAPH_ATTACK_PREFIX = "csa_graph_attack_";
public static final String CSA_GRAPH_PROCESS_PREFIX = "csa_graph_process_";
Integer batchSize;
Integer maxActive;
Integer maxIdel;
Integer maxWait;
String reidsUrl;
Integer port;
String password;
Integer expired;
Neo4jStore client;
RedisPoolStore redisPool;
Context context;
@Override
public synchronized void start() {
super.start();
try {
client = new Neo4jStore(context);
logger.info("graph connect success");
redisPool = new RedisPoolStore(maxActive, maxIdel, maxWait, reidsUrl, port, password);
redisPool.init();
} catch (Exception e) {
logger.error("graph connect error", e);
}
}
public abstract Status process();
@Override
public synchronized void stop() {
super.stop();
}
@Override
public void configure(Context context) {
this.context = context;
batchSize = context.getInteger("graph.batchsize");
if (batchSize == null)
batchSize = 1000;
maxActive = context.getInteger("graph.redis.maxactive");
maxIdel = context.getInteger("graph.redis.maxidel");
maxWait = context.getInteger("graph.redis.maxwait");
reidsUrl = context.getString("graph.redis.ip");
port = context.getInteger("graph.redis.port");
password = context.getString("graph.redis.password");
expired = context.getInteger("graph.redis.expired");
otherInit(context);
}
public void otherInit(Context context) {
}
public String generateNetKey(String srcIp, int srcPort, String destIp, int destPort, long window, String serverId) {
return srcIp + "-" + serverId;
}
public enum GraphLabel {
PROCESS("process"),
IP("ip"),
EVENT("event"),
SERVER("server"),
SOCKET("socket"),
NETLINK("netlink"),
SERVERLINK("serverlink"),
IN("in");
GraphLabel(String name) {
this.name = name;
}
private String name;
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
}
实现类
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.jd.datasource.RedisManager;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.mapdb.Atomic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class CsaNeo4jSink extends Neo4jSink {
private static final Logger logger = LoggerFactory.getLogger(CsaNeo4jSink.class);
/*String event_sql = "UNWIND $message as row\n" +
"MERGE (:server {serverName:row.serverName, serverId:row.serverId,vid:row.serverVid,floatingIp:row.floatingIp, fixedIp:row.fixedIp, dataCenter:row.dataCenter, pin:row.pin})\n" +
"MERGE (:attacker {country:row.country, city:row.city,province:row.province,latitude:row.latitude, longitude:row.longitude, ip:row.ip})\n" +
"MERGE (:event {time:toInteger(row.time), status:toInteger(row.status),source:row.source,srcIp:row.srcIp, srcPort:toInteger(row.srcPort), destIp:row.destIp, destPort:toInteger(row.destPort), vid:row.eventVid,eventId:row.eventId, jdclass:toInteger(row.jdclass), severity:toInteger(row.severity), pin:row.pin, window:toInteger(row.window),serverId:row.serverId, direction:toInteger(row.direction)});";
*/
String event_sql = "UNWIND $message as row\n" +
"MERGE (s:server{serverId:row.serverId})\n" +
"on create set s.serverName=row.serverName,s.serverId=row.serverId,s.vid=row.serverVid,s.floatingIp=row.floatingIp,s.fixedIp=row.fixedIp,s.dataCenter=row.dataCenter,s.pin=row.pin\n" +
"on match set s.serverName=row.serverName,s.serverId=row.serverId,s.vid=row.serverVid,s.floatingIp=row.floatingIp,s.fixedIp=row.fixedIp,s.dataCenter=row.dataCenter,s.pin=row.pin\n" +
"MERGE (a:attacker{ip:row.ip})\n" +
"on create set a.country=row.country,a.city=row.city,a.province=row.province,a.latitude=row.latitude,a.longitude=row.longitude,a.ip=row.ip\n" +
"on match set a.country=row.country,a.city=row.city,a.province=row.province,a.latitude=row.latitude,a.longitude=row.longitude,a.ip=row.ip\n" +
"MERGE (e:event {time:toInteger(row.time), status:toInteger(row.status),source:row.source,srcIp:row.srcIp, srcPort:toInteger(row.srcPort), destIp:row.destIp, destPort:toInteger(row.destPort), vid:row.eventVid,eventId:row.eventId, jdclass:toInteger(row.jdclass), severity:toInteger(row.severity), pin:row.pin, window:toInteger(row.window),serverId:row.serverId, direction:toInteger(row.direction)});";
String attacker2event_sql = "UNWIND $message as row\n" +
"MATCH (att:attacker {ip:row.ip})\n" +
"MATCH (e:event {vid:row.eventVid})\n" +
"MERGE (att)-[:attacker2event{eid:row.attacker2eventId}]->(e);";
String event2attacker_sql = "UNWIND $message as row\n" +
"MATCH (e:event {vid:row.eventVid})\n" +
"MATCH (att:attacker {ip:row.ip})\n" +
"MERGE (e)-[:event2attacker{eid:row.event2attackerId}]->(att);";
String event2server_sql = "UNWIND $message as row\n" +
"MATCH (e:event {vid:row.eventVid})\n" +
"MATCH (ser:server {vid:row.serverVid})\n" +
"MERGE (e)-[:event2server{eid:row.event2serverId}]->(ser);";
String server2event_sql = "UNWIND $message as row\n" +
"MATCH (ser:server {vid:row.serverVid})\n" +
"MATCH (e:event {vid:row.eventVid})\n" +
"MERGE (ser)-[:server2event{eid:row.server2eventId}]->(e);";
private List<String> jdClasses;
@Override
public Status process(){
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
Event event;
String body = "";
List<JSONObject> eventParamsList = new ArrayList<>();
List<JSONObject> attacker2eventList = new ArrayList<>();
List<JSONObject> event2serverList = new ArrayList<>();
List<JSONObject> server2eventList = new ArrayList<>();
List<JSONObject> event2attackerList = new ArrayList<>();
try {
Map<String, String> redisData = new HashMap<>();
for (int i = 0; i < batchSize; i++) {
event = channel.take();
if (event != null) {
try {
body = new String(event.getBody(), "utf-8");
JSONObject bodyObj = JSON.parseObject(body);
JSONObject jdcloud_alert = bodyObj.getJSONObject("jdcloud_alert");
Integer jdclass = jdcloud_alert.getInteger("jd_class");
if (jdClasses == null || jdClasses.contains(jdclass + "")) {
Integer src_port = bodyObj.getInteger("src_port");
Integer dest_port = bodyObj.getInteger("dest_port");
if (src_port == null)
src_port = 0;
if (dest_port == null)
dest_port = 0;
String peerIp = jdcloud_alert.getString("peer_ip");
if (peerIp == null)
peerIp = "";
String floattingIp = jdcloud_alert.getString("floating_ip");
if (floattingIp == null)
floattingIp = "";
String fixedIp = jdcloud_alert.getString("fixed_ip");
String lip = fixedIp;
Integer direction = jdcloud_alert.getInteger("direction");
Integer severity = jdcloud_alert.getInteger("severity");
String eventId = jdcloud_alert.getString("event_id");
if (eventId == null)
eventId = "";
String serverId = jdcloud_alert.getString("server_id");
long timestamp = jdcloud_alert.getLong("timestamp");
String pin = jdcloud_alert.getString("pin");
if (pin == null)
pin = "";
if (serverId == null)
serverId = "";
String dataCenter = jdcloud_alert.getString("data_center");
String serverVid = dataCenter + "-" + serverId;
String serverName = jdcloud_alert.getString("server_name");
JSONObject geo = bodyObj.getJSONObject("peer_ip_geo");
String uid = "";
long window = timestamp / 1000;
window = window - (window % 300);
if (jdclass != 38) {
uid = generateNetKey(peerIp, src_port, lip, dest_port, window, serverId);
} else {
uid = generateNetKey(lip, src_port, peerIp, dest_port, window, serverId);
}
Integer status = jdcloud_alert.getInteger("status");
if (status == null)
status = 0;
// if (StringUtils.isNotEmpty(serverId) && StringUtils.isNotEmpty(peerIp) && StringUtils.isNotEmpty(eventId) && StringUtils.isNotEmpty(uid)) {
if (StringUtils.isEmpty(serverId))
continue;
redisData.put(CSA_GRAPH_ATTACK_PREFIX + uid, eventId);
String eventVid = uid + "-" + eventId;
JSONObject eventParams = new JSONObject();
generateAttacker(eventParams, peerIp, geo);
generateServer(eventParams, serverName, serverId, serverVid, floattingIp, fixedIp, dataCenter, pin);
generateEvent(eventParams, timestamp, status, jdcloud_alert.getString("source"), peerIp, src_port,
floattingIp, dest_port, eventVid, eventId, jdclass, severity, window, direction);
eventParamsList.add(eventParams);
// if (direction == null || 2 == direction) {
if (jdclass != 2 && jdclass != 3) {
JSONObject attacker2eventParams = new JSONObject();
JSONObject event2serverParams = new JSONObject();
attacker2eventParams.put("attacker2eventId", eventId);
attacker2eventParams.put("ip", peerIp);
attacker2eventParams.put("eventVid", eventVid);
event2serverParams.put("event2serverId", eventId);
event2serverParams.put("eventVid", eventVid);
event2serverParams.put("serverVid", serverVid);
attacker2eventList.add(attacker2eventParams);
event2serverList.add(event2serverParams);
} else {
JSONObject event2attackerParams = new JSONObject();
JSONObject server2eventParams = new JSONObject();
event2attackerParams.put("event2attackerId", eventId);
event2attackerParams.put("ip", peerIp);
event2attackerParams.put("eventVid", eventVid);
server2eventParams.put("server2eventId", eventId);
server2eventParams.put("eventVid", eventVid);
server2eventParams.put("serverVid", serverVid);
event2attackerList.add(event2attackerParams);
server2eventList.add(server2eventParams);
}
}
} catch (Exception e) {
logger.error("log parse error " + body, e);
}
}
}
// logger.info("{}, {}", eventParamsList.size()+"", attacker2eventList.size());
if (eventParamsList.size() > 0) {
client.execute(eventParamsList, event_sql);
}
if (attacker2eventList.size() > 0) {
client.execute(attacker2eventList, attacker2event_sql);
}
if (event2serverList.size() > 0) {
client.execute(event2serverList, event2server_sql);
}
if (server2eventList.size() > 0) {
client.execute(server2eventList, server2event_sql);
}
if (event2attackerList.size() > 0) {
client.execute(event2attackerList, event2attacker_sql);
}
transaction.commit();
RedisManager redisManager = new RedisManager(redisPool);
redisManager.batchSet(redisData, expired);
redisManager.close();
return Status.READY;
} catch (Throwable e) {
logger.error("Failed to commit transaction." +
"Transaction rolled back.", e);
try {
transaction.rollback();
} catch (Exception e2) {
logger.error("Exception in rollback. Rollback might not have been" +
"successful.", e2);
}
// Throwables.propagate(e);
return Status.BACKOFF;
} finally {
transaction.close();
}
}
private void generateServer(JSONObject params, String serverName, String serverId, String vid, String floatingIp, String fixedIp, String dataCenter, String pin) {
params.put("serverName", serverName);
params.put("serverId", serverId);
params.put("serverVid", vid);
params.put("floatingIp", floatingIp);
params.put("fixedIp", fixedIp);
params.put("dataCenter", dataCenter);
params.put("pin", pin);
}
private void generateAttacker(JSONObject params, String peerIp, JSONObject geo) {
if (geo != null) {
String country = geo.getString("country");
String city = geo.getString("city");
String province = geo.getString("province");
String latitude = geo.getString("latitude");
String longitude = geo.getString("longitude");
params.put("country", country == null ? "": country);
params.put("city", city == null ? "" : city);
params.put("province", province == null ? "" : province);
params.put("latitude", latitude == null ? "" : latitude);
params.put("longitude", longitude == null ? "": longitude);
} else {
params.put("country", "");
params.put("city", "");
params.put("province", "");
params.put("latitude", "");
params.put("longitude", "");
}
params.put("ip", peerIp);
}
private void generateEvent(JSONObject params, long timestamp, int status, String source, String peerIp, Integer src_port,
String floattingIp, Integer dest_port, String eventVid, String eventId,
int jdclass, int severity, long window, Integer direction) {
params.put("time", timestamp);
params.put("status", status);
params.put("source", source);
params.put("srcIp", peerIp);
params.put("srcPort", src_port);
params.put("destIp", floattingIp);
params.put("destPort", dest_port);
params.put("eventVid", eventVid);
params.put("eventId", eventId);
params.put("jdclass", jdclass);
params.put("severity", severity);
params.put("window", window);
if (jdclass != 2 && jdclass != 3) {
// if (direction == null || 2 == direction) {
params.put("direction", 2);
} else {
params.put("direction", 1);
}
}
@Override
public void otherInit(Context context) {
String jdclass = context.getString("graph.jdclass");
if (StringUtils.isEmpty(jdclass)) {
jdClasses = new ArrayList<>();
} else {
jdClasses = Arrays.asList(jdclass.split(","));
}
}
}
neo4jStore实现
import com.alibaba.fastjson.JSONObject;
import com.google.api.client.util.Preconditions;
import org.apache.flume.Context;
import org.neo4j.driver.v1.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import static org.neo4j.driver.v1.Values.parameters;
public class Neo4jStore {
private final static Logger logger = LoggerFactory.getLogger(Neo4jStore.class);
public static final String INDEX_PREFIX = "jdcloud-seclog-merge-stage1-";
public Neo4jStore(){}
String url;
String username;
String password;
Driver driver = null;
public Neo4jStore(Context context) {
this.url = context.getString("graph.neo4j.url");
this.username = context.getString("graph.neo4j.username");
this.password = context.getString("graph.neo4j.password");
Preconditions.checkNotNull(url, "graph.neo4j.url must be set!!");
Preconditions.checkNotNull(username, "graph.neo4j.username must be set!!");
Preconditions.checkNotNull(password, "graph.neo4j.password must be set!!");
setup();
}
public Neo4jStore(String url, String username, String password) {
this.url = url;
this.username = username;
this.password = password;
Preconditions.checkNotNull(url, "graph.neo4j.url must be set!!");
Preconditions.checkNotNull(username, "graph.neo4j.username must be set!!");
Preconditions.checkNotNull(password, "graph.neo4j.password must be set!!");
setup();
}
public void setup() {
try {
driver = GraphDatabase.driver(url, AuthTokens.basic(username, password));
} catch (Exception e) {
logger.error("init neo4j error", e);
}
}
public void execute(List<JSONObject> message, String sql) {
if (driver == null)
setup();
try {
Session session = driver.session();
session.run(sql, parameters("message", message));
session.close();
} catch (Exception e) {
logger.error("execute neo4j error", e);
}
}
public void close() {
if (driver != null) driver.close();
}
}










网友评论