LogBus使用指南
本节主要介绍数据传输工具LogBus的使用方法:
LogBus Windows版本请阅读LogBus Windows版本使用指南
在开始对接前,您需要先阅读数据规则,在熟悉TA的数据格式与数据规则后,再阅读本指南进行对接。
LogBus上传的数据必须遵循TA的数据格式
下载LogBus
最新版本为: 1.5.12
更新时间为: 2020-08-28
版本升级说明:
1.5.0以上版本:
先执行
./logbus stop
命令停止LogBus,等停止完毕后再执行./logbus update
命令升级至最新版本如果您使用的是1.5.0之前版本,需要升级至新版本,请联系TA工作人员
一、LogBus简介
LogBus工具主要用于将后端的日志数据实时地导入到TA后台,其核心工作原理类似于Flume,会监控服务器日志目录下的文件流,当目录下任意日志文件有新数据产生时,会对新数据进行校验,并实时发送至TA后台。
以下几类用户建议使用LogBus接入数据:
- 使用服务端SDK的用户,通过LogBus上传数据
- 对数据的准确性及维度要求较高,仅通过客户端SDK无法满足数据需求,或不方便接入客户端SDK
- 不想自己开发后端数据推送流程
- 需要传输大批量历史数据
二、使用前数据准备
1.首先将需要传输的数据进行ETL转换成TA的数据格式,并写到本地或传输至Kafka集群,如果使用的是服务端SDK的写入本地文件或Kafka的consumer,则数据已经是正确的格式,无需再进行转换。
2.确定上传数据的文件存放的目录,或者Kafka的地址与topic,并配置LogBus的相关配置,LogBus会监控文件目录下的文件变更(监控文件新建或tail已有文件),或者订阅Kafka中的数据。
3.请勿对存放于监控目录下且已经上传的数据日志直接进行重命名,重命名日志相当于新建文件,LogBus将可能会重新上传这些文件,造成数据重复。4.由于LogBus数据传输组件中包含数据缓冲区,LogBus目录占磁盘可能会稍大,因此请确保LogBus安装节点的磁盘空间充足,每向一个项目(即增加一个APP_ID)传输数据需预留至少10G的存储空间。
三、LogBus的安装与升级
3.1 安装LogBus
1.下载LogBus压缩包,并解压。
2.解压后的目录结构:
- bin:启动程序文件夹
- conf:配置文件文件夹
- lib:功能文件夹
3.2 升级LogBus
如果您使用的是1.5.0及之后的版本,可以先使用./logbus stop
命令停止LogBus,再执行./logbus update
命令升级LogBus至最新版本,再重启LogBus
四、LogBus的参数配置
1.进入解压后的conf
目录,里面有一个配置文件logBus.conf.Template
,该文件包含LogBus所有的配置参数,首次使用时可将其重命名为logBus.conf
。
2.打开logBus.conf
文件进行相关参数配置
4.1 项目与数据源配置(必须配置)
- 项目APP_ID
##APPID来自tga官网的token,请在TA后台的项目配置页面获取接入项目的APPID并填入此处,多个APPID通过","分割
APPID=APPID_1,APPID_2
- 监控文件配置(请选择其中一种,必须配置)
4.1.1.数据来源是本地文件时
##LogBus读取的数据文件所在的路径及文件名(文件名支持模糊匹配), 需要有读权限
##不同APPID用逗号隔开,相同APPID不同目录用空格隔开
##TAIL_FILE的文件名支持java标准的正则表达式
TAIL_FILE=/path1/dir*/log.* /path2/DATE{YYYYMMDD}/txt.*,/path3/txt.*
TAIL_FILE支持对多路径下的多分目录中的多个文件进行监控,下图是
对应参数配置为:
APPID=APPID1,APPID2
TAIL_FILE=/root/log_dir1/dir_*/log.* /root/log_dir2/log/DATE{YYYYMMDD}/log.*,/test_log/*
具体规则如下:
- 同一APP_ID的多个监控路径通过空格分割
- 不同APP_ID的监控路径通过逗号","分割,且按逗号分割后监控路径对应APP_ID
- 监控路径中的分目录(即文件所在的目录)支持通过日期格式或正则表达式监控
- 文件名支持使用正则表达式监控
日期格式分目录的规则:
日期格式分目录需以DATE{}
括起来日期模板,DATE必须大写,以下举例几种可识别的日期模板以及对应监控的文件样例,但不限于此,日期模板只需是标准日期格式即可。
/root/logbus_data/DATE{YYYY-MM-DD}/log.*
---> /root/logbus_data/2019-01-01/log.1/root/logbus_data/DATE{YYMMDD}/log.*
---> /root/logbus_data/190101/log.1/root/logbus_data/DATE{MM_DD_YYYY}/log.*
---> /root/logbus_data/01_01_2019/log.1/root/logbus_data/DATE{MM*DD}/log.*
---> /root/logbus_data/01*01/log.1
4.1.2.数据来源为kafka时
从1.5.2版本后,参数KAFKA_TOPICS
不再支持正则表达式,需监控多个topic时,可使用空格分隔各topic;如存在多APP_ID,则使用半角逗号分割各APP_ID监控的topic。参数KAFKA_GROUPID
必须唯一。1.5.3版本新增参数KAFKA_OFFSET_RESET
,可设置Kafka的kafka.consumer.auto.offset.reset
参数,可取值为earliest
与latest
,默认设置为earliest
。
注意:数据源的Kafka版本必须在 0.10.1.0 或更高
单APP_ID样例:
APPID=appid1
######kafka 配置
#KAFKA_GROUPID=tga.group
#KAFKA_SERVERS=localhost:9092
#KAFKA_TOPICS=topic1 topic2
#KAFKA_OFFSET_RESET=earliest
多APP_ID样例:
APPID=appid1,appid2
######kafka 配置
#KAFKA_GROUPID=tga.group
#KAFKA_SERVERS=localhost:9092
#KAFKA_TOPICS=topic1 topic2,topic3 topic4
#KAFKA_OFFSET_RESET=earliest
4.2 传输参数配置(必须配置)
##传输设置
##传送的url
##http传输请使用
PUSH_URL=http://receiver.ta.thinkingdata.cn/logbus
##如果您使用的是私有化部署服务,请修改传输URL为:http://数据采集地址/logbus
##是否开启检查appid,默认关闭
#IS_CHECK_APPID=false
##每次传输的最大数量
#BATCH=10000
##最少多久传一次(单位:秒)
#INTERVAL_SECONDS=600
##传输线程数,默认单线程,更详细的配置请见官网,必须在上传数据前进行设置,请勿在设置后进行修改
#NUMTHREAD=1
##是否在每条数据中增加uuid属性(开启会降低传输效率)
#IS_ADD_UUID=true
##文件传输的压缩格式:gzip,lzo,lz4,snappy,none
#COMPRESS_FORMAT=none
如需修改 NUMTHREAD 参数,必须在首次启动LogBus前进行设置,请勿在启动LogBus后进行修改
4.3 Flume内存参数配置(可选配置)
# flume管道容量设置
# 管道容量,这里需要根据部署电脑的配置视情况而定。
CAPACITY=1000000
# 管道到sink的传输量,需大于BATCH参数
TRANSACTION_CAPACITY=10000
##指定flume启动的最大内存,单位为M。默认根据启动线程数调整,线程不超过7个并且项目不超过4个使用1024M内存,否则为2048M
#MAX_MEMORY=2048
# flume的channel设置,有file和memory两种(可选,默认使用file)
# CHANNEL_TYPE=file
# CHANNEL_TYPE=memory
4.3.1 CHANNEL_TYPE的设置说明
1.数据来源为kafka时,请不要修改CHANNEL_TYPE参数,使用默认的file已保证数据准确性
2.数据来源为本地文件时,可以修改CHANNEL_TYPE参数为memory加快数据传输效率,占用空间较少,同时保障数据的可靠性以及唯一性。
3.如需修改CHANNEL_TYPE参数,请保证当前源文件的数据已上传完成并移除源文件,否则会出现数据重复上传或数据丢失。
4.使用memory时需要将CAPACITY减小(建议设置为100000,不减少会加大内存占用,可能达不到加快传输的效果)。
4.4 监控文件删除配置(可选配置)
# 监控目录文件删除,去除注释即为启动删除文件功能
# 只能以按天(day)或按小时(hour)删除
# UNIT_REMOVE=hour
# 删除多久之前的文件
# OFFSET_REMOVE=20
# 删除已经上传的监控文件,每隔多少分钟删除
# FREQUENCY_REMOVE=60
4.5 自定义解析器(可选配置)
1.5.9版本开始支持客户自定义数据解析器,用于原数据格式与TA数据格式不一致时自定义转换格式。
详情如下:
- 添加如下依赖
Maven:
<dependency>
<groupId>cn.thinkingdata.ta</groupId>
<artifactId>logbus-custom-interceptor</artifactId>
<version>1.0.1</version>
</dependency>
Gradle:
// https://mvnrepository.com/artifact/cn.thinkingdata.ta/logbus-custom-interceptor
compile group: 'cn.thinkingdata.ta', name: 'logbus-custom-interceptor', version: '1.0.1'
- 实现CustomInterceptor接口中的transFrom方法。方法中的第一个参数为原数据内容,第二个参数为sourceName(用于需要区分APP_ID时使用。根据APP_ID区分,比如只配置一个APP_ID,sourceName为r1,如果配置多个APP_ID,sourceName按顺序依次为r1,r2等)。
public interface CustomInterceptor {
TaDataDo transFrom(String var1, String var2);
}
实现的接口方法例如:
public TaDataDo transFrom(String s, String s1) {
return JSONObject.parseObject(s, TaDataDo.class);
}
- 配置下面两个字段即可
##下面两个字段为使用自定义解析器(必须将两个字段都设置才可使用)
##自定义解析器全限定名(包名+类名),不设置则使用默认解析器
#CUSTOM_INTERCEPTOR=cn.thinkingdata.demo.DemoCustomInterceptor
##自定义解析器jar的绝对路径(包括jar包文件名)
#INTERCEPTOR_PATH=/var/interceptor/custom-interceptor-1.0-SNAPSHOT.jar
4.6 配置文件示例
##################################################################################
## thinkingdata数据分析平台传输工具logBus配置文件
##非注释的为必填参数,注释的为选填参数,可以根据你自身的情况进行
##合适的配置
##环境要求:java8+,更详细的要求请详见tga官网
##http://doc.thinkinggame.cn/tdamanual/installation/logbus_installation.html
##################################################################################
##APPID来自tga官网的token
##不同APPID用逗号隔开
APPID=from_tga1,from_tga2
#-----------------------------------source----------------------------------------
######file-source
##LogBus读取的数据文件所在的路径及文件名(文件名支持模糊匹配), 需要有读权限
##不同APPID用逗号隔开,相同APPID不同目录用空格隔开
##TAIL_FILE的文件名支持java标准的正则表达式
TAIL_FILE=/path1/log.* /path2/txt.*,/path3/log.* /path4/log.* /path5/txt.*
######kafka-source
#KAFKA_GROUPID=tga.flume
#KAFKA_SERVERS=
#KAFKA_TOPICS=
#KAFKA_OFFSET_RESET=earliest
#-----------------------------------channel---------------------------------------
##flume管道容量设置
##管道容量,这里需要根据部署电脑的配置视情况而定。
#CAPACITY=1000000
##管道到sink的传输量,需大于参数BATCH
#TRANSACTION_CAPACITY=10000
##flume的channel设置,有file和memory两种(可选)
#CHANNEL_TYPE=file
#------------------------------------sink-----------------------------------------
##传输设置
##传送的url
##如果您使用的是私有化部署服务,请修改传输URL为:http://数据采集地址/logbus
##PUSH_URL=http://receiver.ta.thinkingdata.cn/logbus
PUSH_URL=http://${数据采集地址}/logbus
##每次传输的最大数量
#BATCH=10000
##最少多久传一次(单位:秒)
#INTERVAL_SECONDS=600
##传输线程数,默认单线程
#NUMTHREAD=1
##### http传输
##文件传输的压缩格式:gzip,lzo,lz4,snappy,none
#COMPRESS_FORMAT=none
#------------------------------------other-----------------------------------------
##监控目录下文件删除,打开注释(必须将下面两个字段都打开)即为启动删除文件功能,每一个小时启动一次文件删除程序
##按unit删除offset之前的文件
##删除多久之前的文件
#OFFSET_REMOVE=
##只接收按天(day)或小时(hour)删除
#UNIT_REMOVE=
#------------------------------------interceptor-----------------------------------
##下面两个字段为使用自定义解析器(必须将两个字段都设置才可使用)
##自定义解析器全限定名,不设置则使用默认解析器
#CUSTOM_INTERCEPTOR=
##自定义解析器jar位置
#INTERCEPTOR_PATH=
五、启动LogBus
在首次启动前请进行以下检查:
1.检查java版本
进入bin
目录,里面会有两个脚本,check_java
与logbus
其中check_java
用于检测java版本是否满足要求,执行脚本,若java版本不满足会出现Java version is less than 1.8
或Can't find java, please install jre first.
等提示
您可以更新JDK版本或查看下一节中的内容为LogBus单独安装JDK
2.安装LogBus的独立JDK
如果LogBus部署节点,由于环境关系,JDK版本不满足LogBus需求,又无法替换成满足LogBus的JDK版本。可以使用此功能。
进入bin
目录,里面会有install_logbus_jdk.sh
。
运行此脚本会在LogBus工作目录新增出java目录。LogBus会默认使用这个目录下的JDK环境。
3.完成logBus.conf的配置,并运行参数环境检查命令
logBus.conf的配置请参考配置LogBus一节
配置完成后运行 env
命令,检查配置参数是否正确
./logbus env
如果输出红色异常信息,说明配置有问题,需要重新修改,直到配置文件没有异常的提示,如上图所示。
当您修改了logBus.conf的配置后,需要重启LogBus以使新配置生效
4.启动LogBus
./logbus start
启动成功会有上图中的提示,失败则会提示异常信息,如下图
六、LogBus命令详解
6.1 帮助信息
不带参数或--help或-h,将会显示如下的帮助信息
主要介绍LogBus的命令:
usage: logbus <命令|辅助命令> [选项]
命令:
start 启动logBus.
restart 重启logBus.
stop 安全退出logBus.
reset 重置logBus读取记录.
stop_atOnce 强制退出logBus.
辅助命令:
env 运行环境校验.
server [-url <url>|-url <url> -appid <appid>] 测试接收端网络情况
data_debug 配置目录下文件的数据格式明细校验.
show_conf 显示当前logBus配置信息.
version 显示版本号.
update 更新logbus到最新版本.
progress 显示当前传输文件进度.
选项:
-appid <appid> 项目appid
-h,--help 显示帮助文档并退出.
-path <path> 指定测试文件的绝对路径
-url <url> 指定测试的url地址
示例:
./logbus start 启动logBus.
./logbus stop 安全退出logBus.
./logbus restart 重启logBus.
./logbus data_debug 配置目录下文件的数据格式明细校验.
./logbus server -url http://${接收端地址}/logbus -appid ***** 测试接收端网络情况
6.2 文件数据格式检查data_debug
当您首次使用LogBus时,我们建议您在正式上传数据前,先对您的数据进行格式校验,数据必须符合数据格式规范,您可以使用data_debug
命令来进行数据格式校验,如下:
此功能会消耗集群资源,每次限定10000条,每个文件平分校验量,从文件头优先校验。
./logbus data_debug
当数据格式正确时,将会提示数据正确,如下图:
如果数据格式存在问题,则会警告格式错误,并简述格式的错误点:
6.3 传输通道检查server_uri
在您完成格式校验后,您还需要检查数据通道是否打通,您可以使用server_uri
命令进行校验,在校验的同时可以输入您在TA平台收到的APP_ID,要注意APP_ID与您的项目是绑定的,在输入前请务必保证您输入的APP_ID对应您的项目
./logbus server -url http://${接收端地址}/logbus -appid ${appid}
如果传输通道已经打通,会提示可以使用:
错误情况如下:
6.4 展示配置信息show_conf
您可以使用show_conf
命令来查看LogBus的配置信息,展示内容如下图所示:
./logbus show_conf
6.5 启动环境检查env
您可以使用env
进行启动环境的检查,如果输出的信息后面带有星号,就说明配置有问题,需要重新修改,直至没有星号提示。
./logbus env
6.6 启动start
当您完成格式的校验、数据通道的检查以及环境检查后,即可启动LogBus进行数据的上传,LogBus将会自动检测您的文件是否有新数据写入,如果有新数据,则将数据进行上传。
./logbus start
6.7 停止stop
如果您想要停止LogBus。请使用stop
命令,该命需要花费一定时间,但不会有数据损失。
./logbus stop
6.8 停止stop_atOnce
如果您想要立刻停止LogBus,请使用stop_atOnce
命令,该命令可能导致数据丢失。
./logbus stop_atOnce
6.9 重启restart
您可以使用restart
命令重启LogBus,适合在修改配置参数后使新配置生效。
./logbus restart
6.10 重置reset
使用reset
将会重置LogBus,请务必谨慎使用该命令,一旦使用将会清空文件传输记录,LogBus将会重新上传所有数据。如果您在不明确的条件下使用该命令,可能导致您的数据出现重复。建议在与TA工作人员沟通后再进行使用。
./logbus reset
使用重置命令后,需要执行start
重新开始传输数据
LogBus 1.5.0 版本后,添加如下确认信息,确认后才会开始重置LogBus
6.11 查看版本号version
如果您想要了解您所使用的LogBus的版本号,可以使用version
命令,如果您的LogBus没有该命令,则您所使用的版本属于早期版本
./logbus version
6.12 升级LogBus版本update
LogBus 1.5.0 版本,新增了在线更新版本的功能,执行该命令可升级LogBus至最新版本
./logbus update
6.13 查看当前上传进度progress
LogBus 1.5.9 版本,新增了查看当前上传进度的功能,执行该命令可以查询当前传输进度
./logbus progress
6.14 检查logbus常见问题doctor
LogBus 1.5.12版本,新增了检查logbus常见问题的命令,执行该命令可以查看当前logbus是否存在问题
./logbus doctor
七、ChangeLog
版本1.5.12 --- 2020/08/28
优化:
- 保障数据来源为Kafka时使用MemoryChannel的数据可靠性,请参考memory的使用说明
- 解决无法移除appid的问题
- 优化env命令,全面检查配置属性并优化显示文案
新增:
- 增加按照指定属性顺序发送
- 增加检查logbus部分常见问题的命令doctor
- 增加#event_id和#first_check_id
版本1.5.11 --- 2020/06/01
优化:
- 保障数据来源为本地文件时使用MemoryChannel的数据可靠性,请参考memory的使用说明
新增:
- 新增可根据数据中的指定属性映射APP_ID。
- 增加LogBus守护进程健康检查。
- 增加flume进程状态监控。
- 数据中可添加uuid属性。
- LogBus增加删除30天之外的日志。
- 增加检查APP_ID。
废弃:
- 废弃旧版本的ftp传输方式。
版本1.5.10 --- 2020/03/31
修复:
- 修复准确路径匹配失败的Bug。
优化:
- 升级fastjson版本至1.2.67,修复反序列化和SSRF漏洞。
版本1.5.9 --- 2020/03/25
新增:
- 增加自定义解析器。
- 增加查看当前传输进度命令progress。
优化:
- taildir模式下支持匹配任意一层模糊路径。
- 优化对mac系统的支持。
版本1.5.8 --- 2020/02/20
新增:
- 针对因网络震荡引起的丢包问题,新增重试策略。
版本1.5.7 --- 2020/02/13
优化:
- 优化USER数据通道的分发策略。
版本1.5.6 --- 2020/01/03
优化:
- 优化pid文件,状态锁文件存放位置。
- 优化并发数在多项目配置时,也允许调大。
- 优化JVM参数。
- 优化读取本地文件时,跳过隐藏文件。
新增:
- 支持独立JDK方式。
- 守护进程添加磁盘使用量扫描功能,磁盘不足主动停止LogBus。
- 新增data_debug功能,校验配置目录下文件的内容的详细错误。
- 新增kafka 数据源的offset位置记录。
废弃:
- 废弃旧版本format_check功能。
版本1.5.5 --- 2019/09/23
优化:
- 优化JDK检查脚本,支持 JDK 10 以上版本校验。
- 优化内部启动顺序。
- 优化flume运行环境,避免环境冲突。
- 添加下载进度条显示功能。
- 优化服务端ip白名单提示。
版本1.5.4 --- 2019/06/25
优化:
- 优化配置文件参数校验逻辑和操作说明文案。
- 读取文件数超过限制最大数时,新增自动停止LogBus的逻辑。
版本1.5.3.1 --- 2019/05/22
修复:
- 修复网络异常情况下数据传输策略问题,由LogBus中断传输改为一直重试。
版本1.5.3 --- 2019/04/25
优化:
- 优化大量文件同时传输时的数据传输逻辑
- 优化LogBus数据传输日志,分为info和error两个日志,方便监控LogBus运行状态
- 升级基础组件flume到最新版本1.9.0
改动:
- 新增kafka对接端offset配置:设置Kafka的
kafka.consumer.auto.offset.reset
参数,可取值为earliest
与latest
,默认设置为earliest
版本1.5.2.2 --- 2019/04/10
修复:
- 修复部分系统兼容性问题
- 修复打开文件最大数问题
- 修复了在某些极端情况下,position文件异常问题
版本1.5.2.1 --- 2019/03/29
修复:
- 修复了在某些极端情况下,LogBus运行异常的问题
版本1.5.2 --- 2019/03/14
新特性:
- Kafka topic支持多APP_ID:使用多APP_ID可对多个Kafka topic进行监控(详情见Kafka相关参数的配置)
版本1.5.1 --- 2019/03/02
新特性:
支持https协议:传输地址PUSH_URL参数支持https协议
支持分目录监控:对(多)目录下多个分目录中的文件进行监控(详情见TAIL_FILE参数的配置),支持通过日期模板以及正则表达式的方式进行配置
版本1.5.0 --- 2018/12/26
新特性:
支持多APP_ID:支持在同一LogBus中向多个项目(多APP_ID)传输数据,使用多APP_ID的同时可对多个日志文件目录进行监控
支持在线更新命令:新增
update
命令,执行该命令可升级LogBus至最新版本
优化:
- 加入执行
reset
命令时的提示
版本1.4.3 --- 2018/11/19
新特性:
- 多文件目录监控:支持对多个日志文件目录进行监控(详情见
TAIL_FILE
参数的配置),同时参数FILE_DIR
和FILE_PATTERN
已弃用,从老版本升级必须对TAIL_FILE
进行配置
改动:
- flume监控改为自定义的CustomMonitor,因此不需配置FM_PORT参数(此参数已弃用)
优化:
- 修复了检测java版本10以上报错的问题
版本1.4.2 --- 2018/09/03
新特性:
- 新增数据传输方式:新增ftp传输方式
版本1.4.0 --- 2018/07/30
新特性:
多实例:同一台服务器可以部署多个LogBus。
只需安装多个LogBus工具,放置在不同的目录,并对每个LogBus配置各自的配置文件即可使用。
多线程传输:实现多线程安全。
通过修改配置文件中的参数
NUMTHREAD
设置线程个数sink端支持多种压缩格式:gzip、lz4、lzo、snappy、不压缩(none)
压缩方式从左往右,压缩比依次降低,请根据网络环境及服务器性能进行选择。
改动:
- 在首次启动或修改配置文件后,需要调用
env
命令使配置文件生效,才能使LogBus正常工作。
优化:
- 优化启动前的检查提示文案。
版本1.3.5 --- 2018/07/18
优化:
- 优化文件格式检查命令输出提示。
- 优化文件传输提示输出。
- checkpoint加入一个备份内容,防止checkpoint频繁读写错误。
- 增加channel水位控制,使其不会出现channel full的警告。
- 增加sink端网络socktimeout,设定为60s。
- 增加LogBus监控,发生sink卡死的时候,自动重启。
版本1.3.4 --- 2018/06/08
改动:
- 数据过滤方面:只过滤空行和非json数据。
- 文件删除功能由原来的每天定点删除,变为每隔一段时间删除。
配置文件:
- 配置文件格式优化,主要将配置文件归为source、channel、sink和others四个部分。
- 新增FREQUENCY_REMOVE参数,用于每隔一段时间删除已上传的目录文件,单位:分。
- 去除TIME_REMOVE参数。
新功能:
- 增加自动化工具(主要是针对ansible)启动的优化脚本,放在bin/automation目录下,主要有启动
start
,停止stop
,立刻停止stop_atOnce
三个命令。
性能优化:
- 优化启动所需的内存,减低内存需求。
版本1.3 --- 2018/04/21
- 增加Kafka数据源的支持
- 修复了已知的Bug
版本1.0 --- 2018/03/29
- LogBus发布