这期内容当中小编将会给大家带来有关flume中如何自定义Interceptor,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
flume现状:
这种比较个性化的转换flume没有相关插件
分析:
flume event 针对source为文本文件时,会一行一个event(默认小于2048长度)
而拦截器就是针对event来做处理的
代码:
package com.wy.flume.interceptor;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
public class AdRefererLogFormatInterceptor implements Interceptor {
//匹配user-agent
private static final Pattern pattern = Pattern.compile("^\"(.*)\"\\s\"(.*)\"(.*)$");
private static final HashMap<String, String> platform = initPlatforms();
private static final HashMap<String, String> browser = initBrowsers();
private static HashMap<String, String> initPlatforms() {
HashMap<String, String> platforms = new HashMap<>();
platforms.put("windows nt 6.2", "Win8");
platforms.put("windows nt 6.2", "Win8");
platforms.put("windows nt 6.1", "Win7");
platforms.put("windows nt 6.0", "Win Longhorn");
platforms.put("windows nt 5.2", "Win2003");
platforms.put("windows nt 5.0", "Win2000");
platforms.put("windows nt 5.1", "WinXP");
platforms.put("windows nt 4.0", "Windows NT 4.0");
platforms.put("winnt4.0", "Windows NT 4.0");
platforms.put("winnt 4.0", "Windows NT");
platforms.put("winnt", "Windows NT");
platforms.put("windows 98", "Win98");
platforms.put("win98", "Win98");
platforms.put("windows 95", "Win95");
platforms.put("win95", "Win95");
platforms.put("windows", "Unknown Windows OS");
platforms.put("os x", "MacOS X");
platforms.put("ppc mac", "Power PC Mac");
platforms.put("freebsd", "FreeBSD");
platforms.put("ppc", "Macintosh");
platforms.put("linux", "Linux");
platforms.put("debian", "Debian");
platforms.put("sunos", "Sun Solaris");
platforms.put("beos", "BeOS");
platforms.put("apachebench", "ApacheBench");
platforms.put("aix", "AIX");
platforms.put("irix", "Irix");
platforms.put("osf", "DEC OSF");
platforms.put("hp-ux", "HP-UX");
platforms.put("netbsd", "NetBSD");
platforms.put("bsdi", "BSDi");
platforms.put("openbsd", "OpenBSD");
platforms.put("gnu", "GNU/Linux");
platforms.put("unix", "Unknown Unix OS");
return platforms;
}
private static HashMap<String, String> initBrowsers() {
HashMap<String, String> browsers = new HashMap<>();
browsers.put("Flock", "Flock");
browsers.put("Chrome", "Chrome");
browsers.put("Opera", "Opera");
browsers.put("MSIE", "IE");
browsers.put("Internet Explorer", "IE");
browsers.put("Shiira", "Shiira");
browsers.put("Firefox", "Firefox");
browsers.put("Chimera", "Chimera");
browsers.put("Phoenix", "Phoenix");
browsers.put("Firebird", "Firebird");
browsers.put("Camino", "Camino");
browsers.put("Netscape", "Netscape");
browsers.put("OmniWeb", "OmniWeb");
browsers.put("Safari", "Safari");
browsers.put("Mozilla", "Mozilla");
browsers.put("Konqueror", "Konqueror");
browsers.put("icab", "iCab");
browsers.put("Lynx", "Lynx");
browsers.put("Links", "Links");
browsers.put("hotjava", "HotJava");
browsers.put("amaya", "Amaya");
browsers.put("IBrowse", "IBrowse");
return browsers;
}
private AdRefererLogFormatInterceptor() {
}
@Override
public void initialize() {
// NO-OP...
}
@Override
public void close() {
// NO-OP...
}
@Override
public Event intercept(Event event) {
String body = new String(event.getBody(), Charsets.UTF_8);
String[] fields = body.split(",", 8);
StringBuilder sb = new StringBuilder();
sb.append(fields[0]);
sb.append('\t');
sb.append(fields[1]);
sb.append('\t');
sb.append(fields[2]);
sb.append('\t');
sb.append(fields[3]);
sb.append('\t');
sb.append(fields[4]);
sb.append('\t');
sb.append(fields[5]);
sb.append('\t');
sb.append(fields[6]);
sb.append('\t');
Matcher submatcher = pattern.matcher(fields[7].trim());
String url = "";
String os = "others";
String br = "others";
String ver = "";
if (submatcher.matches()) {
url = submatcher.group(1);
String agent = submatcher.group(2);
//匹配操作系统
Set<String> platformKeys = platform.keySet();
for (String platformKey : platformKeys) {
Pattern pattern = Pattern.compile( Pattern.quote(platformKey) , Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(agent);
if (matcher.find()) {
os = platform.get(platformKey);
break;
}
}
//匹配浏览器 和版本
Set<String> browserKeys = browser.keySet();
for (String browserKey : browserKeys) {
Pattern pattern = Pattern.compile( Pattern.quote(browserKey) + ".*?([0-9\\.]+)", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(agent);
if (matcher.find()) {
ver = matcher.group(1);
br = browser.get(browserKey);
break;
}
}
}
sb.append(url);
sb.append('\t');
sb.append(os);
sb.append('\t');
sb.append(br);
sb.append('\t');
sb.append(ver);
//修改event body
event.setBody(sb.toString().getBytes());
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
for (Event event : events) {
Event interceptedEvent = intercept(event);
if (interceptedEvent != null) {
intercepted.add(interceptedEvent);
}
}
return intercepted;
}
public static class Builder implements Interceptor.Builder {
//使用Builder初始化Interceptor
@Override
public Interceptor build() {
return new AdRefererLogFormatInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
部署:
1、将程序打包成AdRerfererLogInterceptor.jar
2、将jar包上传到FLUME_HOME的lib目录下(flume1.5采用bin安装)
3、在配置文件中使用Interceptor
hdp2.sources.s1.interceptors = i1
hdp2.sources.s1.interceptors.i1.type = com.wy.flume.interceptor.AdRefererLogFormatInterceptor$Builder
优势:
在数据传输的同时进行数据的处理,节省步骤,而且有flume帮组管理文件进度,程序中断时不用手动做恢复(file channel)
总结:
在Interceptor中可以对event的header 和 body 进行处理,进而达到定制化的目的。
上述就是小编为大家分享的flume中如何自定义Interceptor了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。