自定义数据导入命令(data_transfer)
一、概述
在一些情况下,您需要运用到的数据可能无法以user
或event
的形式来表示,比如一些映射关系表,或者一些外部数据,如果需要运用这部分数据,则需要通过data_transfer
命令导入自定义数据到TA系统中,目前支持以下三种导入数据源:
mysql
: 远程mysql数据库ftp
: 远程FTP文件txtfile
: 本地文件
二、使用方法
数据导入的命令如下:
ta-tool data_transfer -conf <config files> [--date xxx]
传入的参数为导入表的配置文件路径,每一个表即为一个配置文件,支持多个表同时导入,支持通配符的方式,例如:
/data/config/*
或者./config/*.json
可选参数--date:可选,参数表示数据日期,时间宏会基于此基准时间进行替换,可不传,不传则默认取当前日期,格式为
YYYY-MM-DD
,时间宏的具体使用方法,可参考时间宏使用方法单个表的配置文件样例如下:
{
"parallel_num": 2,
"source": {
"type": "txtfile",
"parameter": {
"path": [
"/data/home/ta/importer_test/data/*"
],
"encoding": "UTF-8",
"column": ["*"],
"fieldDelimiter": "\t"
}
},
"target": {
"appid": "test-appid",
"table": "test_table",
"table_desc": "导入测试表",
"partition_value": "@[{yyyyMMdd}-{1day}]",
"column": [
{
"name": "col1",
"type": "timestamp",
"comment": "时间戳"
},
{
"name": "col2",
"type": "varchar"
}
]
}
}
外层参数说明
parallel_num
- 描述: 导入并发线程数,控制导入的速率
- 类型:
int
- 必选:是
- 默认值:无
source
- 描述: 导入数据源的具体参数配置
- 类型:
jsonObject
- 必选:是
- 默认值:无
target
- 描述: 导出目标表的具体参数配置
- 类型:
jsonObject
- 必选:是
- 默认值:无
source参数具体说明
type
- 描述: 导入数据源的类型,目前导入工具支持:
txtfile
,mysql
,ftp
三种导入数据源,后续会增加更多数据源的支持 - 类型:
string
- 必选:是
- 默认值:无
- 描述: 导入数据源的类型,目前导入工具支持:
parameter
- 描述: 不同数据源的具体配置,在2.2节将会有详细的介绍
- 类型:
jsonObject
- 必选:是
- 默认值:无
target参数具体说明
appid
- 描述: 导入表对应的项目appid,在TA系统后台可以查到得到
- 类型:
string
- 必选:是
- 默认值:无
table
- 描述: 导入到TA系统中的表名, 注意:表名全局不能重复,建议基于不同项目增加可以区分的前缀或后缀
- 类型:
string
- 必选:是
- 默认值:无
table_desc
- 描述: 导入表的注释,建议导入时配置该参数,方便后续查询表时明确表含义
- 类型:
string
- 必选:否
- 默认值:空
partition_val
- 描述: 导入的分区值,TA系统导入的自定义表默认都会带上分区字段
$pt
,所以在导入时必须指定导入的分区值,一般可以设定为导入的数据日期,例如:20180701
, 同时也支持时间宏替换,例如:@[{yyyyMMdd}-{1day}]
,在2.1节将会介绍具体使用方法 - 类型:
string
- 必选:是
- 默认值:无
- 描述: 导入的分区值,TA系统导入的自定义表默认都会带上分区字段
column
描述: 定义导入到TA系统中的表字段定义,包含
name
,type
,comment
3个属性值,其中name
与type
为必传字段,样例如下:[{ "name": "col1", "type": "timestamp", "comment": "时间戳" }, { "name": "col2", "type": "varchar" }]
当source端为mysql,且为整表导入时(即
column
字段为["*"]
),则target
中可以不传column
参数,导入工具会以mysql中的表结构为准。其余情况,该字段必传- 类型:
jsonArray
- 必选:否
- 默认值:mysql source端的表schema定义
2.1 时间宏使用方法
配置文件内部可以使用时间宏替换时间参数,ta-tool工具会以导入的开始时间为基准,基于时间宏的参数做时间的偏移计算,并替换掉配置文件中的时间宏,支持的时间宏格式:@[{yyyyMMdd}]
, @[{yyyyMMdd}-{nday}]
, @[{yyyyMMdd}+{nday}]
等等
yyyyMMdd
可以替换成任意可以被 JavadateFormat
解析的日期格式,例如:yyyy-MM-dd HH:mm:ss.SSS
,yyyyMMddHH000000
- n可以是任意整数,表示时间的偏移值
- day表示时间的偏移单位,可以取如下几种:
day
,hour
,minute
,week
,month
- 举例: 假设当前时间为
2018-07-01 15:13:23.234
@[{yyyyMMdd}]
替换为20180701
@[{yyyy-MM-dd}-{1day}]
替换为2018-06-31
@[{yyyyMMddHH}+{2hour}]
替换为2018070117
@[{yyyyMMddHHmm00}-{10minute}]
替换为20180701150300
2.2 导入数据源配置
本节将会介绍不同数据源的参数配置,目前支持:txtfile
, mysql
, ftp
三种导入数据源,根据数据源的不同,需要您调整source的参数
2.2.1 mysql数据源
mysql数据源通过JDBC连接器连接到远程的mysql数据库,并根据用户配置的信息生成查询SELECT SQL语句,然后发送到远程mysql数据库,并将该SQL执行返回结果导入到TA系统的表内
配置样例
- mysql整表导入到TA系统的配置样例:
{
"parallel_num": 2,
"source": {
"type": "mysql",
"parameter": {
"username": "test",
"password": "test",
"column": ["*"],
"connection": [
{
"table": [
"test_table"
],
"jdbcUrl": [
"jdbc:mysql://mysql-ip:3306/testDb"
]
}
]
}
},
"target": {
"appid": "test-appid",
"table": "test_table_abc",
"table_desc": "mysql测试表",
"partition_value": "@[{yyyy-MM-dd}-{1day}]"
}
}
- 自定义sql导入到TA系统的配置样例:
{
"parallel_num": 1,
"source": {
"type": "mysql",
"parameter": {
"username": "test",
"password": "test",
"connection": [
{
"querySql": [
"select db_id,log_time from test_table where log_time>='@[{yyyy-MM-dd 00:00:00}-{1day}]' and log_time<'@[{yyyy-MM-dd 00:00:00}]'"
],
"jdbcUrl": [
"jdbc:mysql://mysql-ip:3306/testDb"
]
}
]
}
},
"target": {
"appid": "test-appid",
"table": "test_table_abc",
"table_desc": "mysql测试表",
"partition_value": "@[{yyyy-MM-dd}-{1day}]",
"column": [
{
"name": "db_id",
"type": "bigint",
"comment": "db序号"
},
{
"name": "log_time",
"type": "timestamp",
"comment": "时间戳"
}
]
}
}
parameter参数说明
jdbcUrl
- 描述:描述的是到对端数据库的JDBC连接信息,使用JSON的数组描述。 注意,jdbcUrl必须包含在connection配置单元中。一般情况下,JSON数组填写一个JDBC连接即可。
- 类型:
jsonArray
- 必选:是
- 默认值:无
username
- 描述:数据源的用户名
- 类型:
string
- 必选:是
- 默认值:无
password
- 描述:数据源指定用户名的密码
- 类型:
string
- 必选:是
- 默认值:无
table
- 描述:所选取的需要同步的表。使用JSON的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一schema结构,MysqlReader不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。
- 类型:
jsonArray
- 必选:是
- 默认值:无
column
- 描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。可以使用
*
号代表默认使用所有列配置,例如[“*”]
。 - 类型:
jsonArray
- 必选:是
- 默认值:无
- 描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。可以使用
where
- 描述:筛选条件,根据指定的
column
、table
、where
条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择前一天的数据进行同步,可以将where条件指定为log_time>='@[{yyyy-MM-dd 00:00:00}-{1day}]' and log_time<'@[{yyyy-MM-dd 00:00:00}]'
。注意:不可以将where条件指定为limit 10
,limit
不是SQL的合法where子句。 where条件可以有效地进行业务增量同步。如果不填写where语句,导入工具均视作同步全量数据。 - 类型:
string
- 必选:否
- 默认值:无
- 描述:筛选条件,根据指定的
querySql
- 描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置参数来自定义筛选SQL。当用户配置了这一项之后,导入工具就会忽略
table
、column
这些配置参数,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据:select a,b from table_a join table_b on table_a.id = table_b.id
,当用户配置querySql时,导入工具直接忽略table、column、where条件的配置,querySql优先级大于table、column、where选项。 - 类型:
string
- 必选:否
- 默认值:无
- 描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置参数来自定义筛选SQL。当用户配置了这一项之后,导入工具就会忽略
2.2.2 ftp数据源
ftp数据源是读取远端FTP服务器上的文件,导入到TA的系统表内,目前FTP的使用限制及特性如下:
- 支持且仅支持读取TXT的文件,且要求TXT中schema为一张二维表
- 支持类CSV格式文件,自定义分隔符
- 支持多种类型数据读取(使用String表示),支持列裁剪,支持列常量
- 支持递归读取、支持文件名过滤
- 支持文本压缩,现有压缩格式为
zip
、gzip
、bzip2
配置样例
{
"parallel_num": 5,
"source": {
"type": "ftp",
"parameter": {
"protocol": "sftp",
"host": "127.0.0.1",
"port": 22,
"username": "xx",
"password": "xxx",
"path": [
"/home/ftp/data/testData/*"
],
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "string"
}
],
"encoding": "UTF-8",
"fieldDelimiter": "\t"
}
},
"target": {
"appid": "test-appid",
"table": "test_table_abc",
"table_desc": "mysql测试表",
"partition_value": "@[{yyyy-MM-dd}-{1day}]",
"column": [
{
"name": "db_id",
"type": "bigint",
"comment": "db序号"
},
{
"name": "log_time",
"type": "timestamp",
"comment": "时间戳"
}
]
}
}
parameter参数说明
protocol
- 描述:ftp服务器协议,目前支持传输协议有ftp和sftp。
- 类型:
string
- 必选:是
- 默认值:无
host
- 描述:ftp服务器地址。
- 类型:
string
- 必选:是
- 默认值:无
port
- 描述:ftp服务器端口。
- 类型:
int
- 必选:否
- 默认值:若传输协议是sftp协议,默认值是22;若传输协议是标准ftp协议,默认值是21
connectPattern
- 描述:连接模式(主动模式或者被动模式)。该参数只在传输协议是标准ftp协议时使用,值只能为:
PORT
(主动),PASV
(被动)。 - 类型:
string
- 必选:否
- 默认值:
PASV
- 描述:连接模式(主动模式或者被动模式)。该参数只在传输协议是标准ftp协议时使用,值只能为:
username
- 描述:ftp服务器访问用户名。
- 类型:
string
- 必选:是
- 默认值:无
password
- 描述:ftp服务器访问密码。
- 类型:
string
- 必选:是
- 默认值:无
path
- 描述:远程ftp文件系统的路径信息,注意这里可以支持填写多个路径。 当指定通配符,导入工具尝试遍历出多个文件信息。例如: 指定
/data/*
,代表读取/data
目录下所有的文件。目前只支持*
号作为文件通配符。 特别需要注意的是,导入工具会将一个作业下同步的所有Text File
视作同一张数据表。用户必须自己保证所有的File
能够适配同一套schema
信息。读取文件必须保证为类CSV格式。 - 类型:
string
- 必选:是
- 默认值:无
- 描述:远程ftp文件系统的路径信息,注意这里可以支持填写多个路径。 当指定通配符,导入工具尝试遍历出多个文件信息。例如: 指定
column
描述:读取字段列表,
type
指定源数据的类型,index
指定当前列来自于文本第几列(以0开始),value
指定当前类型为常量,不从源头文件读取数据,而是根据value
值自动生成对应的列。
默认情况下,用户可以全部按照string
类型读取数据,配置如下:"column": ["*"]
用户可以指定Column字段信息,配置如下:
{ "type": "long", "index": 0 //从远程FTP文件文本第一列获取long字段 }, { "type": "string", "value": "2018-07-01 00:00:00" //生成一个常量字段作为第二列 }
对于用户指定Column信息,
type
必须填写,index/value
必须选择其一。
type
的取值范围是:long
、double
、string
、boolean
- 类型:
jsonArray
- 必选:是
- 默认值:无
fieldDelimiter
- 描述:读取的字段分隔符
- 类型:
string
- 必选:是
- 默认值:
,
compress
- 描述:文本压缩类型,默认不填写意味着没有压缩。支持压缩类型为
zip
、gzip
、bzip2
。 - 类型:
string
- 必选:否
- 默认值:没有压缩
- 描述:文本压缩类型,默认不填写意味着没有压缩。支持压缩类型为
encoding
- 描述:读取文件的编码配置。
- 类型:
string
- 必选:否
- 默认值:
utf-8
skipHeader
- 描述:类CSV格式文件可能存在表头为标题情况,需要跳过。默认不跳过。
- 类型:
boolean
- 必选:否
- 默认值:
false
nullFormat
- 描述:文本文件中无法使用标准字符串定义
null
(空指针),ta-tool
提供nullFormat
定义哪些字符串可以表示为null
。 例如如果用户配置:nullFormat:"\N"
,那么如果源头数据是"\N"
,ta-tool
视作null
字段。 - 类型:
string
- 必选:否
- 默认值:
\N
- 描述:文本文件中无法使用标准字符串定义
2.2.3 txtfile数据源
txtfile数据源是读本地服务器上的文件,导入到TA的系统表内,目前txtfile的使用限制及特性如下:
- 支持且仅支持读取TXT的文件,且要求TXT中schema为一张二维表
- 支持类CSV格式文件,自定义分隔符
- 支持多种类型数据读取(使用
string
表示),支持列裁剪,支持列常量 - 支持递归读取、支持文件名过滤
- 支持文本压缩,现有压缩格式为
zip
、gzip
、bzip2
配置样例
{
"parallel_num": 5,
"source": {
"type": "txtfile",
"parameter": {
"path": [
"/home/ftp/data/testData/*"
],
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "string"
}
],
"encoding": "UTF-8",
"fieldDelimiter": "\t"
}
},
"target": {
"appid": "test-appid",
"table": "test_table_abc",
"table_desc": "mysql测试表",
"partition_value": "@[{yyyy-MM-dd}-{1day}]",
"column": [
{
"name": "db_id",
"type": "bigint",
"comment": "db序号"
},
{
"name": "log_time",
"type": "timestamp",
"comment": "时间戳"
}
]
}
}
parameter参数说明
path
- 描述:本地文件系统的路径信息,注意这里可以支持填写多个路径。 当指定通配符,导入工具尝试遍历出多个文件信息。例如: 指定
/data/*
,代表读取/data
目录下所有的文件。目前只支持*
号作为文件通配符。 特别需要注意的是,导入工具会将一个作业下同步的所有Text File视作同一张数据表。用户必须自己保证所有的File能够适配同一套schema信息。读取文件必须保证为类CSV格式。 - 类型:
string
- 必选:是
- 默认值:无
- 描述:本地文件系统的路径信息,注意这里可以支持填写多个路径。 当指定通配符,导入工具尝试遍历出多个文件信息。例如: 指定
column
描述:读取字段列表,
type
指定源数据的类型,index
指定当前列来自于文本第几列(以0开始),value
指定当前类型为常量,不从源头文件读取数据,而是根据value
值自动生成对应的列。
默认情况下,用户可以全部按照string
类型读取数据,配置如下:"column": ["*"]
用户可以指定Column字段信息,配置如下:
{ "type": "long", "index": 0 //从远程FTP文件文本第一列获取long字段 }, { "type": "string", "value": "2018-07-01 00:00:00" //生成一个常量字段作为第二列 }
对于用户指定Column信息,
type
必须填写,index/value
必须选择其一。 type的取值范围是:long
、double
、string
、boolean
- 类型:
jsonArray
- 必选:是
- 默认值:无
fieldDelimiter
- 描述:读取的字段分隔符
- 类型:
string
- 必选:是
- 默认值:
,
compress
- 描述:文本压缩类型,默认不填写意味着没有压缩。支持压缩类型为
zip
、gzip
、bzip2
。 - 类型:
string
- 必选:否
- 默认值:没有压缩
- 描述:文本压缩类型,默认不填写意味着没有压缩。支持压缩类型为
encoding
- 描述:读取文件的编码配置。
- 类型:
string
- 必选:否
- 默认值:
utf-8
skipHeader
- 描述:类CSV格式文件可能存在表头为标题情况,需要跳过。默认不跳过。
- 类型:
boolean
- 必选:否
- 默认值:
false
nullFormat
- 描述:文本文件中无法使用标准字符串定义
null
(空指针),ta-tool
提供nullFormat
定义哪些字符串可以表示为null
。 例如如果用户配置:nullFormat:"\N"
,那么如果源头数据是"\N"
,ta-tool视作null
字段。 - 类型:
string
- 必选:否
- 默认值:
\N
- 描述:文本文件中无法使用标准字符串定义