自定义数据导入命令(data_transfer)

一、概述

在一些情况下,您需要运用到的数据可能无法以userevent的形式来表示,比如一些映射关系表,或者一些外部数据,如果需要运用这部分数据,则需要通过data_transfer命令导入自定义数据到TA系统中,目前支持以下三种导入数据源:

  • mysql: 远程mysql数据库

  • ftp: 远程FTP文件

  • txtfile: 本地文件

二、使用方法

数据导入的命令如下:

ta-tool data_transfer -conf <config files> [--date xxx]
  • 传入的参数为导入表的配置文件路径,每一个表即为一个配置文件,支持多个表同时导入,支持通配符的方式,例如:/data/config/*或者 ./config/*.json

  • 可选参数--date:可选,参数表示数据日期,时间宏会基于此基准时间进行替换,可不传,不传则默认取当前日期,格式为YYYY-MM-DD,时间宏的具体使用方法,可参考时间宏使用方法

  • 单个表的配置文件样例如下:

{
"parallel_num": 2,
"source": {
    "type": "txtfile",
    "parameter": {
        "path": [
        "/data/home/ta/importer_test/data/*"
        ],
    "encoding": "UTF-8",
    "column": ["*"],
    "fieldDelimiter": "\t"
    }
},
"target": {
   "appid": "test-appid",
   "table": "test_table",
   "table_desc": "导入测试表",
   "partition_value": "@[{yyyyMMdd}-{1day}]",
   "column": [
      {
        "name": "col1",
        "type": "timestamp",
        "comment": "时间戳"
      },
      {
        "name": "col2",
        "type": "varchar"
      }
    ]
  }
}
  • 外层参数说明

  • parallel_num

    • 描述: 导入并发线程数,控制导入的速率
    • 类型: int
    • 必选:
    • 默认值:无
  • source

    • 描述: 导入数据源的具体参数配置
    • 类型: jsonObject
    • 必选:
    • 默认值:无
  • target

    • 描述: 导出目标表的具体参数配置
    • 类型: jsonObject
    • 必选:
    • 默认值:无
  • source参数具体说明

  • type

    • 描述: 导入数据源的类型,目前导入工具支持:txtfile, mysql, ftp 三种导入数据源,后续会增加更多数据源的支持
    • 类型: string
    • 必选:
    • 默认值:无
  • parameter

  • target参数具体说明

  • appid

    • 描述: 导入表对应的项目appid,在TA系统后台可以查到得到
    • 类型: string
    • 必选:
    • 默认值:无
  • table

    • 描述: 导入到TA系统中的表名, 注意:表名全局不能重复,建议基于不同项目增加可以区分的前缀或后缀
    • 类型: string
    • 必选:
    • 默认值:无
  • table_desc

    • 描述: 导入表的注释,建议导入时配置该参数,方便后续查询表时明确表含义
    • 类型: string
    • 必选:否
    • 默认值:空
  • partition_val

    • 描述: 导入的分区值,TA系统导入的自定义表默认都会带上分区字段 $pt ,所以在导入时必须指定导入的分区值,一般可以设定为导入的数据日期,例如: 20180701, 同时也支持时间宏替换,例如:@[{yyyyMMdd}-{1day}],在2.1节将会介绍具体使用方法
    • 类型: string
    • 必选:
    • 默认值:无
  • column

    • 描述: 定义导入到TA系统中的表字段定义,包含name, type, comment 3个属性值,其中nametype为必传字段,样例如下:

      [{
            "name": "col1",
             "type": "timestamp",
            "comment": "时间戳"
         },
        {
            "name": "col2",
            "type": "varchar"
         }]
      

      当source端为mysql,且为整表导入时(即column字段为["*"]),则target中可以不传column参数,导入工具会以mysql中的表结构为准。其余情况,该字段必传

    • 类型: jsonArray
    • 必选:否
    • 默认值:mysql source端的表schema定义

2.1 时间宏使用方法

配置文件内部可以使用时间宏替换时间参数,ta-tool工具会以导入的开始时间为基准,基于时间宏的参数做时间的偏移计算,并替换掉配置文件中的时间宏,支持的时间宏格式:@[{yyyyMMdd}], @[{yyyyMMdd}-{nday}], @[{yyyyMMdd}+{nday}]等等

  • yyyyMMdd可以替换成任意可以被 Java dateFormat解析的日期格式,例如:yyyy-MM-dd HH:mm:ss.SSS, yyyyMMddHH000000
  • n可以是任意整数,表示时间的偏移值
  • day表示时间的偏移单位,可以取如下几种: day, hour, minute, week, month
  • 举例: 假设当前时间为2018-07-01 15:13:23.234
    • @[{yyyyMMdd}] 替换为 20180701
    • @[{yyyy-MM-dd}-{1day}] 替换为 2018-06-31
    • @[{yyyyMMddHH}+{2hour}] 替换为 2018070117
    • @[{yyyyMMddHHmm00}-{10minute}] 替换为 20180701150300

2.2 导入数据源配置

本节将会介绍不同数据源的参数配置,目前支持:txtfile, mysql, ftp 三种导入数据源,根据数据源的不同,需要您调整source的参数

2.2.1 mysql数据源

mysql数据源通过JDBC连接器连接到远程的mysql数据库,并根据用户配置的信息生成查询SELECT SQL语句,然后发送到远程mysql数据库,并将该SQL执行返回结果导入到TA系统的表内

配置样例

  • mysql整表导入到TA系统的配置样例:
{
  "parallel_num": 2,
  "source": {
    "type": "mysql",
    "parameter": {
      "username": "test",
      "password": "test",
      "column": ["*"],
      "connection": [
        {
          "table": [
            "test_table"
          ],
          "jdbcUrl": [
            "jdbc:mysql://mysql-ip:3306/testDb"
          ]
        }
      ]
    }
  },
  "target": {
    "appid": "test-appid",
    "table": "test_table_abc",
    "table_desc": "mysql测试表",
    "partition_value": "@[{yyyy-MM-dd}-{1day}]"
  }
}
  • 自定义sql导入到TA系统的配置样例:
{
  "parallel_num": 1,
  "source": {
    "type": "mysql",
    "parameter": {
      "username": "test",
      "password": "test",
      "connection": [
        {
          "querySql": [
            "select db_id,log_time from test_table where log_time>='@[{yyyy-MM-dd 00:00:00}-{1day}]' and log_time<'@[{yyyy-MM-dd 00:00:00}]'"
          ],
          "jdbcUrl": [
            "jdbc:mysql://mysql-ip:3306/testDb"
          ]
        }
      ]
    }
  },
  "target": {
    "appid": "test-appid",
    "table": "test_table_abc",
    "table_desc": "mysql测试表",
    "partition_value": "@[{yyyy-MM-dd}-{1day}]",
    "column": [
      {
        "name": "db_id",
        "type": "bigint",
        "comment": "db序号"
      },
      {
        "name": "log_time",
        "type": "timestamp",
        "comment": "时间戳"
      }
    ]
  }
}
  • parameter参数说明

  • jdbcUrl

    • 描述:描述的是到对端数据库的JDBC连接信息,使用JSON的数组描述。 注意,jdbcUrl必须包含在connection配置单元中。一般情况下,JSON数组填写一个JDBC连接即可。
    • 类型: jsonArray
    • 必选:
    • 默认值:无
  • username

    • 描述:数据源的用户名
    • 类型: string
    • 必选:
    • 默认值:无
  • password

    • 描述:数据源指定用户名的密码
    • 类型: string
    • 必选:
    • 默认值:无
  • table

    • 描述:所选取的需要同步的表。使用JSON的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一schema结构,MysqlReader不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。
    • 类型: jsonArray
    • 必选:
    • 默认值:无
  • column

    • 描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。可以使用*号代表默认使用所有列配置,例如[“*”]
    • 类型: jsonArray
    • 必选:
    • 默认值:无
  • where

    • 描述:筛选条件,根据指定的columntablewhere条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择前一天的数据进行同步,可以将where条件指定为log_time>='@[{yyyy-MM-dd 00:00:00}-{1day}]' and log_time<'@[{yyyy-MM-dd 00:00:00}]' 。注意:不可以将where条件指定为limit 10limit不是SQL的合法where子句。 where条件可以有效地进行业务增量同步。如果不填写where语句,导入工具均视作同步全量数据。
    • 类型: string
    • 必选:否
    • 默认值:无
  • querySql

    • 描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置参数来自定义筛选SQL。当用户配置了这一项之后,导入工具就会忽略tablecolumn这些配置参数,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据: select a,b from table_a join table_b on table_a.id = table_b.id,当用户配置querySql时,导入工具直接忽略table、column、where条件的配置,querySql优先级大于table、column、where选项。
    • 类型: string
    • 必选:否
    • 默认值:无

2.2.2 ftp数据源

ftp数据源是读取远端FTP服务器上的文件,导入到TA的系统表内,目前FTP的使用限制及特性如下:

  1. 支持且仅支持读取TXT的文件,且要求TXT中schema为一张二维表
  2. 支持类CSV格式文件,自定义分隔符
  3. 支持多种类型数据读取(使用String表示),支持列裁剪,支持列常量
  4. 支持递归读取、支持文件名过滤
  5. 支持文本压缩,现有压缩格式为zipgzipbzip2

配置样例

{
  "parallel_num": 5,
  "source": {
    "type": "ftp",
    "parameter": {
      "protocol": "sftp",
      "host": "127.0.0.1",
      "port": 22,
      "username": "xx",
      "password": "xxx",
      "path": [
        "/home/ftp/data/testData/*"
      ],
      "column": [
        {
          "index": 0,
          "type": "long"
        },
        {
          "index": 1,
          "type": "string"
        }
      ],
      "encoding": "UTF-8",
      "fieldDelimiter": "\t"
    }
  },
  "target": {
    "appid": "test-appid",
    "table": "test_table_abc",
    "table_desc": "mysql测试表",
    "partition_value": "@[{yyyy-MM-dd}-{1day}]",
    "column": [
      {
        "name": "db_id",
        "type": "bigint",
        "comment": "db序号"
      },
      {
        "name": "log_time",
        "type": "timestamp",
        "comment": "时间戳"
      }
    ]
  }
}

parameter参数说明

  • protocol

    • 描述:ftp服务器协议,目前支持传输协议有ftp和sftp。
    • 类型: string
    • 必选:
    • 默认值:无
  • host

    • 描述:ftp服务器地址。
    • 类型: string
    • 必选:
    • 默认值:无
  • port

    • 描述:ftp服务器端口。
    • 类型: int
    • 必选:否
    • 默认值:若传输协议是sftp协议,默认值是22;若传输协议是标准ftp协议,默认值是21
  • connectPattern

    • 描述:连接模式(主动模式或者被动模式)。该参数只在传输协议是标准ftp协议时使用,值只能为:PORT (主动),PASV(被动)。
    • 类型: string
    • 必选:否
    • 默认值:PASV
  • username

    • 描述:ftp服务器访问用户名。
    • 类型: string
    • 必选:
    • 默认值:无
  • password

    • 描述:ftp服务器访问密码。
    • 类型: string
    • 必选:
    • 默认值:无
  • path

    • 描述:远程ftp文件系统的路径信息,注意这里可以支持填写多个路径。 当指定通配符,导入工具尝试遍历出多个文件信息。例如: 指定/data/*,代表读取/data目录下所有的文件。目前只支持*号作为文件通配符。 特别需要注意的是,导入工具会将一个作业下同步的所有Text File视作同一张数据表。用户必须自己保证所有的File能够适配同一套schema信息。读取文件必须保证为类CSV格式。
    • 类型: string
    • 必选:
    • 默认值:无
  • column

    • 描述:读取字段列表,type指定源数据的类型,index指定当前列来自于文本第几列(以0开始),value指定当前类型为常量,不从源头文件读取数据,而是根据value值自动生成对应的列。
      默认情况下,用户可以全部按照string类型读取数据,配置如下:

       "column": ["*"]
      

      用户可以指定Column字段信息,配置如下:

     {
           "type": "long",
           "index": 0    //从远程FTP文件文本第一列获取long字段
        },
        {
           "type": "string",
           "value": "2018-07-01 00:00:00"  //生成一个常量字段作为第二列
        }
    

    对于用户指定Column信息,type必须填写,index/value必须选择其一。
    type的取值范围是:longdoublestringboolean

    • 类型: jsonArray
    • 必选:
    • 默认值:无
  • fieldDelimiter

    • 描述:读取的字段分隔符
    • 类型: string
    • 必选:
    • 默认值:,
  • compress

    • 描述:文本压缩类型,默认不填写意味着没有压缩。支持压缩类型为zipgzipbzip2
    • 类型: string
    • 必选:否
    • 默认值:没有压缩
  • encoding

    • 描述:读取文件的编码配置。
    • 类型: string
    • 必选:否
    • 默认值:utf-8
  • skipHeader

    • 描述:类CSV格式文件可能存在表头为标题情况,需要跳过。默认不跳过。
    • 类型: boolean
    • 必选:否
    • 默认值:false
  • nullFormat

    • 描述:文本文件中无法使用标准字符串定义null(空指针),ta-tool提供nullFormat定义哪些字符串可以表示为null。 例如如果用户配置: nullFormat:"\N",那么如果源头数据是"\N"ta-tool视作null字段。
    • 类型: string
    • 必选:否
    • 默认值:\N

2.2.3 txtfile数据源

txtfile数据源是读本地服务器上的文件,导入到TA的系统表内,目前txtfile的使用限制及特性如下:

  1. 支持且仅支持读取TXT的文件,且要求TXT中schema为一张二维表
  2. 支持类CSV格式文件,自定义分隔符
  3. 支持多种类型数据读取(使用string表示),支持列裁剪,支持列常量
  4. 支持递归读取、支持文件名过滤
  5. 支持文本压缩,现有压缩格式为zipgzipbzip2

配置样例

{
  "parallel_num": 5,
  "source": {
    "type": "txtfile",
    "parameter": {
      "path": [
        "/home/ftp/data/testData/*"
      ],
      "column": [
        {
          "index": 0,
          "type": "long"
        },
        {
          "index": 1,
          "type": "string"
        }
      ],
      "encoding": "UTF-8",
      "fieldDelimiter": "\t"
    }
  },
  "target": {
    "appid": "test-appid",
    "table": "test_table_abc",
    "table_desc": "mysql测试表",
    "partition_value": "@[{yyyy-MM-dd}-{1day}]",
    "column": [
      {
        "name": "db_id",
        "type": "bigint",
        "comment": "db序号"
      },
      {
        "name": "log_time",
        "type": "timestamp",
        "comment": "时间戳"
      }
    ]
  }
}

parameter参数说明

  • path

    • 描述:本地文件系统的路径信息,注意这里可以支持填写多个路径。 当指定通配符,导入工具尝试遍历出多个文件信息。例如: 指定/data/*,代表读取/data目录下所有的文件。目前只支持*号作为文件通配符。 特别需要注意的是,导入工具会将一个作业下同步的所有Text File视作同一张数据表。用户必须自己保证所有的File能够适配同一套schema信息。读取文件必须保证为类CSV格式。
    • 类型: string
    • 必选:
    • 默认值:无
  • column

    • 描述:读取字段列表,type指定源数据的类型,index指定当前列来自于文本第几列(以0开始),value指定当前类型为常量,不从源头文件读取数据,而是根据value值自动生成对应的列。
      默认情况下,用户可以全部按照string类型读取数据,配置如下:

       "column": ["*"]
      

      用户可以指定Column字段信息,配置如下:

      {
           "type": "long",
           "index": 0    //从远程FTP文件文本第一列获取long字段
        },
        {
           "type": "string",
           "value": "2018-07-01 00:00:00"  //生成一个常量字段作为第二列
        }
      

      对于用户指定Column信息,type必须填写,index/value必须选择其一。 type的取值范围是:longdoublestringboolean

    • 类型: jsonArray
    • 必选:
    • 默认值:无
  • fieldDelimiter

    • 描述:读取的字段分隔符
    • 类型: string
    • 必选:
    • 默认值:,
  • compress

    • 描述:文本压缩类型,默认不填写意味着没有压缩。支持压缩类型为zipgzipbzip2
    • 类型: string
    • 必选:否
    • 默认值:没有压缩
  • encoding

    • 描述:读取文件的编码配置。
    • 类型: string
    • 必选:否
    • 默认值:utf-8
  • skipHeader

    • 描述:类CSV格式文件可能存在表头为标题情况,需要跳过。默认不跳过。
    • 类型: boolean
    • 必选:否
    • 默认值:false
  • nullFormat

    • 描述:文本文件中无法使用标准字符串定义null(空指针),ta-tool提供nullFormat定义哪些字符串可以表示为null。 例如如果用户配置: nullFormat:"\N",那么如果源头数据是"\N",ta-tool视作null字段。
    • 类型: string
    • 必选:否
    • 默认值:\N

results matching ""

    No results matching ""