RT,我想在Flume中加入一个自定义的拦截器,可是遇到了问题
下面是我flume的配置文件:
agent.sources=s1
agent.channels=c1
agent.sinks=k1agent.sources.s1.type=syslogtcp
agent.sources.s1.host=192.168.1.37
agent.sources.s1.port=i20000
agent.sources.s1.channels=c1
agent.sources.s1.interceptors =i1
agent.sources.s1.interceptors.i1.type=com.flume.interceptor.HbInterceptor$Builderagent.sinks.k1.type=file_roll
agent.sinks.k1.sink.directory=/soft/flume/file_sink
agent.sinks.k1.sink.serializer=text
agent.sinks.k1.channel=c1agent.channels.c1.type=memory
agent.channels.c1.capacity=1000
agent.channels.c1.transactionCapacity=500
下面代码是我自定义的拦截器类:
package com.flume.interceptor;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;public class HbInterceptor implements Interceptor {

HbInterceptor(){

} public void close() { } public void initialize() { } public Event intercept(Event event) {
try {
byte[] body = event.getBody();
byte[] frame = Arrays.copyOfRange(body, 0, 2);
if (frame[0] == '#' && frame[1] == '#') {
byte[] dataLen = Arrays. copyOfRange(body, frame.length,
frame.length + 4);
String dataLenStr = String.valueOf(dataLen);
int dataLength = Integer.parseInt(dataLenStr);
byte[] data = Arrays.copyOfRange(body, frame.length
+ dataLen.length, frame.length + dataLen.length
+ dataLength);
byte[] crcCheck = Arrays.copyOfRange(body, frame.length
+ dataLen.length + dataLength, frame.length
+ dataLen.length + dataLength + 4);
int dataCRC = ByteStrToInt(crcCheck);
int CrcSum = checkOutSixteenCheckSum(data, dataLength);
if (dataCRC == CrcSum) {
event.setBody(data);
} else {
event = null;
}
} else {
event = null;
}
} catch (Exception e) {
e.printStackTrace();
} return event;
} public List<Event> intercept(List<Event> events) {
List<Event> intercepted = new ArrayList<Event>();
for (Event event : events) {
Event interceptorEvent = intercept(event);
if (interceptorEvent != null) {
intercepted.add(interceptorEvent);
}
}
return intercepted;
}

public static class Builer implements Interceptor.Builder{ public void configure(Context context) {

} public Interceptor build() {
return new HbInterceptor();
} } // \u5c06\u7ec8\u7aef\u8ba1\u7b97\u7684CRC\u8ba1\u7b97\u503c\u8f6c\u5316\u6210int\u578b
public int ByteStrToInt(byte[] checkSum) {
String str = new String(checkSum);
int iValule = Integer.parseInt(str, 16);
return iValule;
} // 212\u534f\u8bae\u7684CRC\u6821\u9a8c\u7b97\u6cd5
private int checkOutSixteenCheckSum(byte[] bytes, int len) {
int r = 0xffff;
char hi;
char flag;
for (int i = 0; i < len; i++) {
hi = (char) (r >> 8);
hi ^= bytes[i];
r = hi; for (int j = 0; j < 8; j++) {
flag = (char) (r & 0x0001);
r = r >> 1;
if (flag == 1) {
r ^= 0xa001;
}
}
}
return r;
}
}
我将上面的工程打成jar包放到了flume根目录中lib文件夹中,然后用上面的配置文件运行会报出java.lang.ClassNotFoundException: com.flume.interceptor.HbInterceptor$Builder
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:190)
at org.apache.flume.interceptor.InterceptorBuilderFactory.newInstance(InterceptorBuilderFactory.java:48)
at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:111)
at org.apache.flume.channel.ChannelProcessor.configure(ChannelProcessor.java:82)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:348)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:101)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2017-03-13 11:26:20,128 (conf-file-poller-0) [ERROR - org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:361)] Source s1 has been removed due to an error during configuration
org.apache.flume.FlumeException: Interceptor.Builder not found.
at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:116)
at org.apache.flume.channel.ChannelProcessor.configure(ChannelProcessor.java:82)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:348)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:101)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.flume.interceptor.HbInterceptor$Builder
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:190)
at org.apache.flume.interceptor.InterceptorBuilderFactory.newInstance(InterceptorBuilderFactory.java:48)
at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:111)
... 12 more
刚开始就接触Flume,求大神给指出哪里出问题了,谢谢!