扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
这期内容当中小编将会给大家带来有关heka从kalka中读取数据的示例分析,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
创新互联是专业的如皋网站建设公司,如皋接单;提供网站制作、成都做网站,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行如皋网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!
heka从kalka中读取数据。
配置:
[hekad]
maxprocs = 2
[KafkaInputExample]
type = "KafkaInput"
topic = "test"
addrs = ["localhost:9092"]
[RstEncoder]
[LogOutput]
message_matcher = "TRUE"
encoder = "RstEncoder"
上述配置只有从kalfka中读取数据并显示到console,写到kalfka中数据,
结果
:Timestamp: 2016-07-21 09:39:46.342093657 +0000 UTC
:Type: heka.kafka
:Hostname: master
:Pid: 0
:Uuid: 501b0a0e-63a9-4eee-b9ca-ab572c17d273
:Logger: KafkaInputExample
:Payload: {"msg":"Start Request","event":"artemis.web.ensure-running1","userid":"12","extra":{"workspace-id":"cN907xLngi"},"time":"2015-05-06T 20:40:05.509926234Z","severity":1}
:EnvVersion:
:Severity: 7
:Fields:
| name:"Key" type:bytes value:
| name:"Topic" type:string value:"test"
| name:"Partition" type:integer value:0
| name:"Offset" type:integer value:8
读取出来的数据放到了payload中,而fileds中存放了读取kalkfa中的一些信息。那么可以使用jsondecoder进行解析。
[hekad]
maxprocs = 2
[KafkaInputExample]
type = "KafkaInput"
topic = "test"
addrs = ["localhost:9092"]
decoder="JsonDecoder"
[JsonDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/json.lua"
[JsonDecoder.config]
type = "artemis"
payload_keep = true
map_fields = true
Severity = "severity"
[RstEncoder]
[LogOutput]
message_matcher = "TRUE"
encoder = "RstEncoder"
结果如下:
:Timestamp: 2016-07-21 09:42:34 +0000 UTC
:Type: artemis
:Hostname: master
:Pid: 0
:Uuid: 3965285c-70ac-4069-a1a3-a9bcf518d3e8
:Logger: KafkaInputExample
:Payload: {"msg":"Start Request","event":"artemis.web.ensure-running2","userid":"11","extra":{"workspace-id":"cN907xLngi"},"time":"2015-05-06T 20:40:05.509926234Z","severity":1}
:EnvVersion:
:Severity: 1
:Fields:
| name:"time" type:string value:"2015-05-06T 20:40:05.509926234Z"
| name:"msg" type:string value:"Start Request"
| name:"userid" type:string value:"11"
| name:"event" type:string value:"artemis.web.ensure-running2"
| name:"extra.workspace-id" type:string value:"cN907xLngi"
经过decoder解析之后,fileds发生了改变,但是我们可以看到Logger显示的还是KafkaInputExample,说明数据不是decoder产生,而是Input产生,只不过使用了decoder进行了解析,重写改写了fields而已。
接下来,把数据录入都es中吧。
[hekad]
maxprocs = 2
[KafkaInputExample]
type = "KafkaInput"
topic = "test"
addrs = ["localhost:9092"]
decoder="JsonDecoder"
[JsonDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/json.lua"
[JsonDecoder.config]
type = "artemis"
payload_keep = true
map_fields = true
Severity = "severity"
[ESJsonEncoder]
index = "%{Type}-%{%Y.%m.%d}"
es_index_from_timestamp = true
type_name = "%{Type}"
[ESJsonEncoder.field_mappings]
Timestamp = "@timestamp"
Severity = "level"
[ElasticSearchOutput]
message_matcher = "TRUE"
encoder = "ESJsonEncoder"
flush_interval = 1
导入到es中,也需要json,所以使用ESJsonEncoder,同时指定索引名字和类型。执行结果如下,
可以看到,除了heka中元数据field之外,还有JsonDecoder生成field啊,其实是截取JsonDecoder中的fields属性中拿出。注意,Payload不解析。
:Fields:
| name:"time" type:string value:"2015-05-06T 20:40:05.509926234Z"
| name:"msg" type:string value:"Start Request"
| name:"userid" type:string value:"11"
| name:"event" type:string value:"artemis.web.ensure-running2"
| name:"extra.workspace-id" type:string value:"cN907xLngi"
这些field当然随着数据不同而不同,那么称之为dynamic fileds。
入es的时候,可以指定提取哪些dynamic fields,
fields=["Timestamp","Uuid","Type","Logger","Pid","Hostname","DynamicFields"]
dynamic_fields=["msg","userid"]
只要使用dynamic_fileds,就必须要在fields中指定DynamicFields。
如果没有dynamic_fileds,那么fields只能列举几个固定的属性,参照官方文档即可。
完成的列子:
[hekad]
maxprocs = 2
[KafkaInputExample]
type = "KafkaInput"
topic = "test"
addrs = ["localhost:9092"]
decoder="JsonDecoder"
[JsonDecoder]
type = "SandboxDecoder"
[hekad]
maxprocs = 2
[KafkaInputExample]
type = "KafkaInput"
topic = "test"
addrs = ["localhost:9092"]
decoder="JsonDecoder"
[JsonDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/json.lua"
[JsonDecoder.config]
type = "artemis"
payload_keep = true
map_fields = true
Severity = "severity"
[ESJsonEncoder]
index = "%{Type}-%{%Y.%m.%d}"
es_index_from_timestamp = true
type_name = "%{Type}"
fields=["Timestamp","Uuid","Type","Logger","Pid","Hostname","DynamicFields"]
dynamic_fields=["msg","userid"]
raw_bytes_fields=["Payload"]
[ESJsonEncoder.field_mappings]
Timestamp = "@timestamp"
Severity = "level"
[ElasticSearchOutput]
message_matcher = "TRUE"
encoder = "ESJsonEncoder"
flush_interval = 1
结果如下,
上述就是小编为大家分享的heka从kalka中读取数据的示例分析了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注创新互联行业资讯频道。
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流