之前相关的文章:
apache carbondata 介绍
CarbonData 列式存储文件结构
CarbonData 数据管理
本教程将介绍 CarbonData 上的所有命令和数据操作。
- CREATE TABLE
- CREATE EXTERNAL TABLE
- CREATE DATABASE
- TABLE MANAGEMENT
- LOAD DATA
- UPDATE AND DELETE
- COMPACTION
- PARTITION
- BUCKETING
- SEGMENT MANAGEMENT
CREATE TABLE
这个命令可用于通过指定字段列表以及表格属性来创建 CarbonData 表,您还可以指定表存储的位置。
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name[(col_name data_type , ...)]
STORED BY 'carbondata'
[TBLPROPERTIES (property_name=property_value, ...)]
[LOCATION 'path']
注意: CarbonData 同时支持 "STORED AS carbondata" 和 "USING carbondata",请访问 CarbonData 库中的示例代码 CarbonSessionExample 。
使用指南
以下是 TBLPROPERTIES 的使用原则,CarbonData 的附加表选项可以通过 carbon.properties 设置。
- 字典编码配置
- 从 1.3 开始,字典编码默认对所有的列关闭,你可以使用此命令来包含或排除列以进行字典编码。建议使用用例:对低基数(low cardinality)列进行字典编码,可能有助于提高数据压缩率和性能。
- TBLPROPERTIES ('DICTIONARY_INCLUDE'='column1, column2')
- 倒排索引配置
- 默认情况下,倒排索引是启用的, 这可能有助于提高压缩率和查询速度,特别是对除于有利位置的低基数列。建议使用用例:对于高基数(high cardinality)列,你可以禁用倒排索引以提高数据加载性能。
- TBLPROPERTIES ('NO_INVERTED_INDEX'='column1, column3')
- 排序列的配置
- 这个属性供用户指定哪些列属于MDK(Multi-Dimensions-Key)索引。
- 如果用户没有指定 "SORT_COLUMN" 属性, 默认情况下,除了复杂类型的列其他类型的列都会创建 MDK 索引。
- 如果指定了这个属性,但是没有指定参数,这样表在加载的时候不会被排序
- 这个属性只支持 string, date, timestamp, short, int, long 以及 boolean 数据类型。建议使用用例: 只为需要的列建立 MDK 索引,这可能有助于提高数据加载性能。
- TBLPROPERTIES ('SORT_COLUMNS'='column1, column3')
- OR
- TBLPROPERTIES ('SORT_COLUMNS'='')
- 排序范围配置
- 这个属性供用户在数据加载期间指定排序范围,以下是排序范围的类型。
- LOCAL_SORT: 这是默认的排序范围
- NO_SORT: 这将会以未经排序的方式加载数据,会显着提高负载性能。
- BATCH_SORT: 如果块个数 > 并行性,它将增加数据的加载性能,但会减少数据的查询性能。
- GLOBAL_SORT: 这会增加数据的查询性能,特别是高并发查询。如果你特别关心加载资源的隔离时使用,因为系统使用 Spark 的 GroupBy 对数据进行排序,我们可以通过 Spark 来控制资源。
- 表块大小配置
- 这个属性用于设置表的块大小,默认值是 1024 MB,这个属性只能设置在 1MB ~ 2048MB 之间。
- TBLPROPERTIES ('TABLE_BLOCKSIZE'='512')
- 注意: 512 或者 512M 两种写法都支持。
- 表压缩(Compaction)配置
- 这个属性是表级别的压缩配置,如果没有指定,carbon.properties 文件中系统级别的配置将会被使用。以下是5种配置:
- MAJOR_COMPACTION_SIZE: 和 carbon.major.compaction.size 参数的意思一致,单位是 MB。
- AUTO_LOAD_MERGE: 和 carbon.enable.auto.load.merge 参数的意思一致。
- COMPACTION_LEVEL_THRESHOLD: 和 carbon.compaction.level.threshold 参数的意思一致。
- COMPACTION_PRESERVE_SEGMENTS: 和 carbon.numberof.preserve.segments 参数的意思一致。
- ALLOWED_COMPACTION_DAYS: 和 carbon.allowed.compaction.days 参数的意思一致。
- TBLPROPERTIES ('MAJOR_COMPACTION_SIZE'='2048',
- 'AUTO_LOAD_MERGE'='true',
- 'COMPACTION_LEVEL_THRESHOLD'='5,6',
- 'COMPACTION_PRESERVE_SEGMENTS'='10',
- 'ALLOWED_COMPACTION_DAYS'='5')
- Streaming
- CarbonData 支持流式摄取实时数据。您可以使用以下表属性创建 streaming 表。
- TBLPROPERTIES ('streaming'='true')
示例:
CREATE TABLE IF NOT EXISTS productSchema.productSalesTable (
productNumber INT,
productName STRING,
storeCity STRING,
storeProvince STRING,
productCategory STRING,
productBatch STRING,
saleQuantity INT,
revenue INT)
STORED BY 'carbondata'
TBLPROPERTIES ('SORT_COLUMNS'='productName,storeCity',
'SORT_SCOPE'='NO_SORT')
注意: CarbonData 同时支持 "STORED AS carbondata" 和 "USING carbondata",请访问 CarbonData 库中的示例代码 CarbonSessionExample 。
CREATE TABLE AS SELECT
这个功能允许用户从任何 Parquet/Hive/Carbon表来创建 Carbon 表。当用户想要从任何其他 Parquet/Hive 表创建 Carbon 表,然后使用 Carbon 查询引擎查询并获得更好的查询结果时更有用。当然,这种方式也可用于备份数据。
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
STORED BY 'carbondata'
[TBLPROPERTIES (key1=val1, key2=val2, ...)]
AS select_statement;
示例
carbon.sql("CREATE TABLE source_table(
id INT,
name STRING,
city STRING,
age INT)
STORED AS parquet")
carbon.sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
carbon.sql("INSERT INTO source_table SELECT 2,'david','shenzhen',31")
carbon.sql("CREATE TABLE target_table
STORED BY 'carbondata'
AS SELECT city,avg(age) FROM source_table GROUP BY city")
carbon.sql("SELECT * FROM target_table").show
// results:
// +--------+--------+
// | city|avg(age)|
// +--------+--------+
// |shenzhen| 29.0|
// +--------+--------+
CREATE EXTERNAL TABLE
该功能允许用户通过指定位置来创建外部表。
CREATE EXTERNAL TABLE [IF NOT EXISTS] [db_name.]table_name
STORED BY 'carbondata' LOCATION ?$FilesPath?
在托管表(managed table)数据位置上创建外部表。
托管表数据位置将包含 FACT 和 Metadata 文件夹。 这些数据可以通过创建一个普通的 carbon 表来生成,并使用这个路径作为上面语法中 $FilesPath 的值。
示例:
sql("CREATE TABLE origin(key INT, value STRING) STORED BY 'carbondata'")
sql("INSERT INTO origin select 100,'spark'")
sql("INSERT INTO origin select 200,'hive'")
// creates a table in $storeLocation/origin
sql(s"""
|CREATE EXTERNAL TABLE source
|STORED BY 'carbondata'
|LOCATION '$storeLocation/origin'
""".stripMargin)
checkAnswer(sql("SELECT count(*) from source"), sql("SELECT count(*) from origin"))
在非事务(Non-Transactional)表数据位置上创建外部表。
非事务(Non-Transactional)表的数据位置里面仅仅包含 carbondata 和 carbonindex 文件,里面不会有 metadata 文件夹(表状态和模式)。 我们的 SDK 模块目前仅支持以这种格式写入数据。
示例:
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
|'$writerPath' """.stripMargin)
这里的 $writerPath 文件夹里面只会有 carbondata 和索引文件。 可能是 SDK 的输出文件。请参见 SDK 写入指南。
注意: 删除外部表不会删除路径下的文件。
CREATE DATABASE
这个功能用于创建一个新的数据库。默认情况下,数据库是在 Carbon 的 store location 创建的,但你也可以指定自定义位置。
CREATE DATABASE [IF NOT EXISTS] database_name [LOCATION path];
示例
CREATE DATABASE carbon LOCATION ?hdfs://name_cluster/dir1/carbonstore?;
TABLE MANAGEMENT
SHOW TABLE
这个命令可用于列出当前数据库中的所有表或着特定数据库中的所有表。
SHOW TABLES [IN db_Name]
示例:
SHOW TABLES
OR
SHOW TABLES IN defaultdb
ALTER TABLE
以下章节介绍用于修改现有表的物理或逻辑状态的命令。
- RENAME TABLE
- 这个命令用于重命名当前表
- ALTER TABLE [db_name.]table_name RENAME TO new_table_name
- 示例:
- ALTER TABLE carbon RENAME TO carbonTable
- OR
- ALTER TABLE test_db.carbon RENAME TO test_db.carbonTable
- ADD COLUMNS
- 这个命令用于在已存在的表中添加新的列。
- ALTER TABLE [db_name.]table_name ADD COLUMNS (col_name data_type,...)
- TBLPROPERTIES('DICTIONARY_INCLUDE'='col_name,...',
- 'DEFAULT.VALUE.COLUMN_NAME'='default_value')
- 示例:
- ALTER TABLE carbon ADD COLUMNS (a1 INT, b1 STRING)
- ALTER TABLE carbon ADD COLUMNS (a1 INT, b1 STRING) TBLPROPERTIES('DICTIONARY_INCLUDE'='a1')
- ALTER TABLE carbon ADD COLUMNS (a1 INT, b1 STRING) TBLPROPERTIES('DEFAULT.VALUE.a1'='10')
- DROP COLUMNS
- 这个命令可以删除给定表中的列
- ALTER TABLE [db_name.]table_name DROP COLUMNS (col_name, ...)
- 示例:
- ALTER TABLE carbon DROP COLUMNS (b1)
- OR
- ALTER TABLE test_db.carbon DROP COLUMNS (b1)
- ALTER TABLE carbon DROP COLUMNS (c1,d1)
- CHANGE DATA TYPE
- 这个命令用于将数据类型从 INT 更改为 BIGINT 或将 decimal 从低精度更改为更高的精度。仅在没有数据丢失的情况下才支持将 decimal 数据类型从较低精度更改为较高精度。
- ALTER TABLE [db_name.]table_name CHANGE col_name col_name changed_column_type
- 有效方案
- 无效的场景 - 将 decimal 的精度从(10,2)转换成(10,5)是无效的,因为在这种情况下,只有比例(译者注 2变成5)增加,但总位数保持不变。
- 有效的场景 - 将 decimal 的精度从(10,2)转换成(12,3)是有效的,因为总的位数增加了2(译者注 10变成12),比例只增加 1,这种情况不会出现数据丢数。
- 注意: 允许的范围是 38,38(精度,范围),并且是一种有效的大写(upper case)情况,不会导致数据丢失。
- 示例1:将 a1 的数据类型从 INT 转换成 BIGINT。
- ALTER TABLE test_db.carbon CHANGE a1 a1 BIGINT
- 示例2:将 a1 的 decimal 精度从 10 转换成 18。
- ALTER TABLE test_db.carbon CHANGE a1 a1 DECIMAL(18,2)
DROP TABLE
这个命令用于删除已存在的表
DROP TABLE [IF EXISTS] [db_name.]table_name
示例:
DROP TABLE IF EXISTS productSchema.productSalesTable
REFRESH TABLE
该命令用于从现有 Carbon 表数据将 Carbon 表注册到 HIVE 元存储目录。
REFRESH TABLE $db_NAME.$table_NAME
示例:
REFRESH TABLE dbcarbon.productSalesTable
注意:
- 新的数据库名字必须和老的数据库名字一致。
- 在执行此命令之前,应将旧表的模式和数据复制到新数据库相应的位置。
- 如果表是聚合表,所有的聚合表需要复制到新的数据库中。
- 对于旧存储(old store),源和目标群集的时区应该相同。
- 如果旧集群使用 HIVE 元存储来存储模式,则刷新将不起作用,因为模式文件没有存储在文件系统中。
表和列注释
你可以通过表注释来提供更多的信息。同样,你也可以使用列注释提供某列的更多信息。
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
STORED BY 'carbondata'
[TBLPROPERTIES (property_name=property_value, ...)]
示例:
CREATE TABLE IF NOT EXISTS productSchema.productSalesTable (
productNumber Int COMMENT 'unique serial number for product')
COMMENT ?This is table comment?
STORED BY 'carbondata'
TBLPROPERTIES ('DICTIONARY_INCLUDE'='productNumber')
你也可以使用 ALTER 命令来 SET 和 UNSET 表的注释。
SET 表注释的例子:
ALTER TABLE carbon SET TBLPROPERTIES ('comment'='this table comment is modified');
UNSET 表注释的例子:
ALTER TABLE carbon UNSET TBLPROPERTIES ('comment');
LOAD DATA
将数据加载到 CARBONDATA 表中
这个命令用于加载 csv 格式的文件到 carbondata 表中, OPTIONS 参数在加载数据的过程中是可选的。在 OPTIONS 中我们可以指定任何的先决条件,比如:DELIMITER, QUOTECHAR, FILEHEADER, ESCAPECHAR, MULTILINE。
LOAD DATA [LOCAL] INPATH 'folder_path'
INTO TABLE [db_name.]table_name
OPTIONS(property_name=property_value, ...)
你可以使用下面的选项来加载数据:
- DELIMITER: 加载命令中可以指定数据的分隔符。
- OPTIONS('DELIMITER'=',')
- QUOTECHAR: 加载命令中可以指定数据的引用字符。
- OPTIONS('QUOTECHAR'='"')
- COMMENTCHAR: 可以在加载命令中提供注释标识符,用于注释掉你不需要的数据。
- OPTIONS('COMMENTCHAR'='#')
- HEADER: 如果你加载不带文件头的 CSV 文件并且文件头和表的模式一致,这时候你可以在加载数据的 SQL 里面加上 'HEADER'='false',这时候用户就不需要指定文件头。默认情况下这个属性的值是 'true'。 false: CSV 文件不带文件头;true: CSV 文件带文件头。
- OPTIONS('HEADER'='false')
- 注意: 如果 HEADER 选项存在,并且其值为 'true',这时候 FILEHEADER 选项就可选了。
- FILEHEADER: 如果源文件里面不存在头信息,那么可以通过这个选项在 LOAD DATA 命令提供 Headers 。
- OPTIONS('FILEHEADER'='column1,column2')
- MULTILINE: CSV 引号中带有换行符。
- OPTIONS('MULTILINE'='true')
- ESCAPECHAR: 如果用户希望严格验证 CSV 文件中的转义字符,可以提供转义字符。
- OPTIONS('ESCAPECHAR'='\')
- SKIP_EMPTY_LINE: 在数据加载过程中忽略 CSV 文件中的空行。
- OPTIONS('SKIP_EMPTY_LINE'='TRUE/FALSE')
- COMPLEX_DELIMITER_LEVEL_1: 分割一行中的复杂类型的数据列 (eg., a$b$c --> Array = {a,b,c}).
- OPTIONS('COMPLEX_DELIMITER_LEVEL_1'='$')
- COMPLEX_DELIMITER_LEVEL_2: 分割一行中嵌套的复杂类型数据列. 根据复杂数据类型,提供 level_1 & level_2 分隔符(eg., a:b$c:d --> Array> = {{a,b},{c,d}}).
- OPTIONS('COMPLEX_DELIMITER_LEVEL_2'=':')
- ALL_DICTIONARY_PATH: 所有字典文件的路径。
- OPTIONS('ALL_DICTIONARY_PATH'='/opt/alldictionary/data.dictionary')
- COLUMNDICT: 指定列的字典文件路径.
- OPTIONS('COLUMNDICT'='column1:dictionaryFilePath1,column2:dictionaryFilePath2')
- 注意: ALL_DICTIONARY_PATH 和 COLUMNDICT 不能同时使用。
- DATEFORMAT/TIMESTAMPFORMAT: 指定列的日期和时间戳格式。
- OPTIONS('DATEFORMAT' = 'yyyy-MM-dd','TIMESTAMPFORMAT'='yyyy-MM-dd HH:mm:ss')
- 注意: 日期格式由日期模式字符串指定。 Carbondata 中的日期模式字符串和 JAVA 中的一致,请参见SimpleDateFormat.
- SORT COLUMN BOUNDS: 排序列的范围界限。
- 假设表是用 'SORT_COLUMNS'='name,id' 创建的,并且 name 的范围是aaa ~ zzz,id 的值范围是0 ~ 1000。那么在数据加载的时候,我们可以通过指定下面的选项来加强数据加载的性能。
- OPTIONS('SORT_COLUMN_BOUNDS'='f,250;l,500;r,750')
- 每个范围使用 ';' 分割,范围里面每个字段值使用 ','。在上面的例子中,我们提供了 3 个边界来将记录分布到 4 个分区。 'f','l','r' 边界值可以平均分配(evenly distribute)记录。在 carbondata 内部,为了记录,我们比较排序列的值和边界的值,并决定记录将被转发到哪个分区。
- 注意:
- 仅在 SORT_SCOPE 的值为 'local_sort' 的时候,SORT_COLUMN_BOUNDS 才会被使用。
- Carbondata 将在最终的排序过程中使用这些边界作为范围来同时处理数据。记录将在每个分区内排序并写出。由于分区已排序,所有记录将被排序。
- 由于字典列的实际顺序和字面顺序不一定相同,因此如果第一个排序列是'dictionary_include',我们不建议您使用此功能。
- 如果您在加载数据期间 CPU 的使用率较低,该选项可以更好地工作。如果您的系统已经处于紧张状态,最好不要使用此选项。此外,性能还取决于用户指定的边界。如果用户不知道使数据在边界之间均匀分布的确切边界,加载性能仍然会比以前好或至少与以前相同。
- 用户可以在 PR1953 的描述中找到有关该选项的更多信息。
- SINGLE_PASS: Single Pass 加载可以使用单个作业即时完成数据加载以及字典生成。它增强了在初始加载数据之后的涉及字典上很少增量更新的后续数据加载场景下的性能。
该选项指定是否使用单通道加载数据。 默认情况下,该选项设置为 FALSE。
OPTIONS('SINGLE_PASS'='TRUE')
注意:
- 如果此选项设置为 TRUE,则数据加载将花费较少的时间。
- 如果此选项设置为除 TRUE 或 FALSE 之外的某个无效值,则会使用默认值。
示例:
LOAD DATA local inpath '/opt/rawdata/data.csv' INTO table carbontable
options('DELIMITER'=',', 'QUOTECHAR'='"','COMMENTCHAR'='#',
'HEADER'='false',
'FILEHEADER'='empno,empname,designation,doj,workgroupcategory,
workgroupcategoryname,deptno,deptname,projectcode,
projectjoindate,projectenddate,attendance,utilization,salary',
'MULTILINE'='true','ESCAPECHAR'='\','COMPLEX_DELIMITER_LEVEL_1'='$',
'COMPLEX_DELIMITER_LEVEL_2'=':',
'ALL_DICTIONARY_PATH'='/opt/alldictionary/data.dictionary',
'SINGLE_PASS'='TRUE')
- BAD RECORDS HANDLING: 处理坏记录(bad records)的方法如下:
- 在处理错误之前加载所有数据。
- 在加载数据前清理或删除坏记录,或在发现坏记录时停止数据加载。
- OPTIONS('BAD_RECORDS_LOGGER_ENABLE'='true', 'BAD_RECORD_PATH'='hdfs://hacluster/tmp/carbon', 'BAD_RECORDS_ACTION'='REDIRECT', 'IS_EMPTY_DATA_BAD_RECORD'='false')
注意:
- 对于坏记录,BAD_RECORDS_ACTION 属性主要有四种操作类型:FORCE, REDIRECT, IGNORE 和 FAIL。
- FAIL 是其默认的值。如果使用 FAIL 选项,则如果发现任何坏记录,数据加载将会失败。
- 如果使用了 REDIRECT 选项, CarbonData 会将所有坏记录添加到单独的 CSV 文件中。 但是,在后续的数据加载我们不能使用这个文件,因为其内容可能与原始记录不完全匹配。建议你事先对原始源数据进行清理,以进行下一步的数据处理。此选项主要用于提醒你哪些记录是坏记录
- 如果使用了 FORCE 选项,那么会在加载数据之前将坏记录存储为 NULL,然后再存储。
- 如果使用了 IGNORE 选项,那么坏记录不会被加载,也不会写到单独的 CSV 文件中。
- 在加载的数据中,如果所有记录都是坏记录,则 BAD_RECORDS_ACTION 选项将会无效,加载操作会失败。
- 每列的最大字符数为 32000,如果列中的字符数超过 32000 个,则数据加载将会失败。
示例:
LOAD DATA INPATH 'filepath.csv' INTO TABLE tablename
OPTIONS('BAD_RECORDS_LOGGER_ENABLE'='true','BAD_RECORD_PATH'='hdfs://hacluster/tmp/carbon',
'BAD_RECORDS_ACTION'='REDIRECT','IS_EMPTY_DATA_BAD_RECORD'='false')
将数据插入到 CARBONDATA 表中
这个命令可以将数据插入到 CarbonData 中,它被分别定义为 Insert 和 Select 查询的组合。它将源表中的记录插入到 CarbonData 表中,源表可以是一个 Hive 表、Parquet 表 或者是 CarbonData 表。它具有通过在源表上执行 Select 查询来聚合表的记录并将其处理的结果加载到 CarbonData 表中的功能。
INSERT INTO TABLE <CARBONDATA TABLE> SELECT * FROM sourceTableName
[ WHERE { <filter_condition> } ]
你可以忽略掉 table 关键字,并按照下面的格式编写你的查询:
INSERT INTO <CARBONDATA TABLE> SELECT * FROM sourceTableName
[ WHERE { <filter_condition> } ]
覆盖插入数据:
INSERT OVERWRITE TABLE <CARBONDATA TABLE> SELECT * FROM sourceTableName
[ WHERE { <filter_condition> } ]
注意:
- 源表和 CarbonData 表必须具有相同的表模式。
- 源表和目标表列的数据类型必须相同
- 如果发现坏记录,INSERT INTO 命令不支持部分成功,这种情况下它会直接失败。
- 在数据从源表插入到目标表的过程中,源表不能进行插入或更新操作。
示例
INSERT INTO table1 SELECT item1, sum(item2 + 1000) as result FROM table2 group by item1
INSERT INTO table1 SELECT item1, item2, item3 FROM table2 where item2='xyz'
INSERT OVERWRITE TABLE table1 SELECT * FROM TABLE2
UPDATE AND DELETE
UPDATE
这个命令允许根据列表达式和可选的过滤条件更新来 CarbonData 表。
UPDATE <table_name>
SET (column_name1, column_name2, ... column_name n) = (column1_expression , column2_expression, ... column n_expression )
[ WHERE { <filter_condition> } ]
或者,下面的命令也可以用于更新 CarbonData 表:
UPDATE <table_name>
SET (column_name1, column_name2) =(select sourceColumn1, sourceColumn2 from sourceTable [ WHERE { <filter_condition> } ] )
[ WHERE { <filter_condition> } ]
注意:如果源表中的多个输入行与目标表中的单行匹配,update 命令将会失败。
示例:
UPDATE t3 SET (t3_salary) = (t3_salary + 9) WHERE t3_name = 'aaa1'
UPDATE t3 SET (t3_date, t3_country) = ('2017-11-18', 'india') WHERE t3_salary < 15003
UPDATE t3 SET (t3_country, t3_name) = (SELECT t5_country, t5_name FROM t5 WHERE t5_id = 5) WHERE t3_id < 5
UPDATE t3 SET (t3_date, t3_serialname, t3_salary) = (SELECT '2099-09-09', t5_serialname, '9999' FROM t5 WHERE t5_id = 5) WHERE t3_id < 5
UPDATE t3 SET (t3_country, t3_salary) = (SELECT t5_country, t5_salary FROM t5 FULL JOIN t3 u WHERE u.t3_id = t5_id and t5_id=6) WHERE t3_id >6
DELETE
这个命令允许我们从 CarbonData 表中删除记录。
DELETE FROM table_name [WHERE expression]
示例:
DELETE FROM carbontable WHERE column1 = 'china'
DELETE FROM carbontable WHERE column1 IN ('china', 'USA')
DELETE FROM carbontable WHERE column1 IN (SELECT column11 FROM sourceTable2)
DELETE FROM carbontable WHERE column1 IN (SELECT column11 FROM sourceTable2 WHERE column1 = 'USA')
COMPACTION
压缩可以显着地提高查询性能。
有几种类型的 Compaction。
ALTER TABLE [db_name.]table_name COMPACT 'MINOR/MAJOR/CUSTOM'
- Minor Compaction
在 Minor compaction 中,用户可以指定要合并的文件数量。如果 carbon.enable.auto.load.merge 参数设置为 true,则 Minor compaction 会在每次数据加载的时候触发。如果有 segments 可以合并,则 minor compaction 将与数据加载并行运行,minor compaction 有两个级别:
- Level 1: 合并尚未压缩的 segments。
- Level 2: 合并已经压缩(Level 1)的 segments 以形成更大的 segment。
ALTER TABLE table_name COMPACT 'MINOR'
- Major Compaction
在 Major compaction 中,多个 segments 可以合并成一个大的 segment。用户需要指定压缩的大小以便可以合并 segments,Major compaction 通常在非高峰时段完成。可以选择一个适当的值来配置 carbon.major.compaction.size 属性,这个属性的单位是 MB。
下面命令将指定数量的 segments 合并到一个 segment 中:
ALTER TABLE table_name COMPACT 'MAJOR'
- Custom Compaction
在自定义压缩中,用户可以直接指定要合并到一个大片段中的 segment ID。 所有指定的分段 ID 都应存在且有效,否则压缩将失败。 自定义压缩通常在非高峰期进行。
ALTER TABLE table_name COMPACT 'CUSTOM' WHERE SEGMENT.ID IN (2,3,4)
- 在压缩完成之后清理 segments
清理已经压缩的 segments:
CLEAN FILES FOR TABLE carbon_table
PARTITION
STANDARD PARTITION
Carbondata 的分区与 Spark 和 Hive 的分区是类似的, 用户可以使用任何的列来建立分区:
创建分区表
下面命令允许你创建带有分区的表。
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name data_type , ...)]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type , ...)]
[STORED BY file_format]
[TBLPROPERTIES (property_name=property_value, ...)]
示例:
CREATE TABLE IF NOT EXISTS productSchema.productSalesTable (
productNumber INT,
productName STRING,
storeCity STRING,
storeProvince STRING,
saleQuantity INT,
revenue INT)
PARTITIONED BY (productCategory STRING, productBatch STRING)
STORED BY 'carbondata'
使用静态分区加载数据
下面命令允许你使用静态分区来加载数据。
LOAD DATA [LOCAL] INPATH 'folder_path'
INTO TABLE [db_name.]table_name PARTITION (partition_spec)
OPTIONS(property_name=property_value, ...)
INSERT INTO INTO TABLE [db_name.]table_name PARTITION (partition_spec) <SELECT STATEMENT>
示例:
LOAD DATA LOCAL INPATH '${env:HOME}/staticinput.csv'
INTO TABLE locationTable
PARTITION (country = 'US', state = 'CA')
INSERT INTO TABLE locationTable
PARTITION (country = 'US', state = 'AL')
SELECT <columns list excluding partition columns> FROM another_user
使用动态分区加载数据
下面命令允许你使用动态分区来加载数据。如果未指定分区,则该分区被视为动态分区。
示例:
LOAD DATA LOCAL INPATH '${env:HOME}/staticinput.csv'
INTO TABLE locationTable
INSERT INTO TABLE locationTable
SELECT <columns list excluding partition columns> FROM another_user
列出所有分区
这个命令获取表的 Hive 分区信息。
SHOW PARTITIONS [db_name.]table_name
删除分区
这个命令仅删除指定的 Hive 分区。
ALTER TABLE table_name DROP [IF EXISTS] PARTITION (part_spec, ...)
示例:
ALTER TABLE locationTable DROP PARTITION (country = 'US');
Insert OVERWRITE
这个命令允许你在特定分区上插入或加载数据,并覆盖原有数据。
INSERT OVERWRITE TABLE table_name
PARTITION (column = 'partition_name')
select_statement
示例:
INSERT OVERWRITE TABLE partitioned_user
PARTITION (country = 'US')
SELECT * FROM another_user au
WHERE au.country = 'US';
CARBONDATA PARTITION(HASH,RANGE,LIST) -- Alpha 功能,此分区功能不支持更新和删除数据。
这个分区支持三种类型:(Hash,Range,List), 类似于其他系统的分区功能,CarbonData 的分区功能可用于通过对分区列进行过滤来提高查询性能。
创建哈希分区表
下面命令允许你创建一个哈希分区表。
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name data_type , ...)]
PARTITIONED BY (partition_col_name data_type)
STORED BY 'carbondata'
[TBLPROPERTIES ('PARTITION_TYPE'='HASH',
'NUM_PARTITIONS'='N' ...)]
注意: N 是哈希分区的个数
示例:
CREATE TABLE IF NOT EXISTS hash_partition_table(
col_A STRING,
col_B INT,
col_C LONG,
col_D DECIMAL(10,2),
col_F TIMESTAMP
) PARTITIONED BY (col_E LONG)
STORED BY 'carbondata' TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='9')
创建范围分区表
下面命令允许你创建一个范围分区表。
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name data_type , ...)]
PARTITIONED BY (partition_col_name data_type)
STORED BY 'carbondata'
[TBLPROPERTIES ('PARTITION_TYPE'='RANGE',
'RANGE_INFO'='2014-01-01, 2015-01-01, 2016-01-01, ...')]
注意:
- 'RANGE_INFO' 必须在表属性中按升序定义。
- 日期/时间戳类型的分区列默认格式为yyyy-MM-dd。 我们可以在 CarbonProperties 里面自定义日期/时间戳格式。
示例:
CREATE TABLE IF NOT EXISTS range_partition_table(
col_A STRING,
col_B INT,
col_C LONG,
col_D DECIMAL(10,2),
col_E LONG
) partitioned by (col_F Timestamp)
PARTITIONED BY 'carbondata'
TBLPROPERTIES('PARTITION_TYPE'='RANGE',
'RANGE_INFO'='2015-01-01, 2016-01-01, 2017-01-01, 2017-02-01')
创建列表分区表
下面命令允许你创建一个列表分区表。
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name data_type , ...)]
PARTITIONED BY (partition_col_name data_type)
STORED BY 'carbondata'
[TBLPROPERTIES ('PARTITION_TYPE'='LIST',
'LIST_INFO'='A, B, C, ...')]
注意: 列表分区支持一个组级别的列表信息。
示例:
CREATE TABLE IF NOT EXISTS list_partition_table(
col_B INT,
col_C LONG,
col_D DECIMAL(10,2),
col_E LONG,
col_F TIMESTAMP
) PARTITIONED BY (col_A STRING)
STORED BY 'carbondata'
TBLPROPERTIES('PARTITION_TYPE'='LIST',
'LIST_INFO'='aaaa, bbbb, (cccc, dddd), eeee')
列出所有分区
执行以下命令可以获取表的分区信息
SHOW PARTITIONS [db_name.]table_name
添加分区
ALTER TABLE [db_name].table_name ADD PARTITION('new_partition')
分区拆分
ALTER TABLE [db_name].table_name SPLIT PARTITION(partition_id) INTO('new_partition1', 'new_partition2'...)
删除分区
只删除分区定义,但保留数据
ALTER TABLE [db_name].table_name DROP PARTITION(partition_id)
同时删除分区定义和数据
ALTER TABLE [db_name].table_name DROP PARTITION(partition_id) WITH DATA
注意:
- 哈希分区表不支持 ADD, SPLIT 和 DROP 命令
- Partition Id:在 CarbonData 中,分区的划分并不是使用文件夹实现的,而是使用分区 ID。可以利用这个特性,并减少一些元数据。
SegmentDir/0_batchno0-0-1502703086921.carbonindex
^
SegmentDir/part-0-0_batchno0-0-1502703086921.carbondata
^
以下是提高 carbonData 分区表查询性能的一些使用技巧:
- 分区列可以从 SORT_COLUMNS 中排除,这将使其他列可以进行一些高效的排序。
- 如果你使用的表是分区表,请尝试分区列上添加一些过滤条件。
BUCKETING
Bucket 功能可将表/分区的数据分发/组织成多个文件,使得相似的记录在同一个文件中显示。在创建表时,用户需要指定用于分桶的列以及桶的数量。对于桶的选择,使用列的哈希值。
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name data_type, ...)]
STORED BY 'carbondata'
TBLPROPERTIES('BUCKETNUMBER'='noOfBuckets',
'BUCKETCOLUMNS'='columnname')
注意:
- 复杂数据类型的列不能作为分桶的字段
- BUCKETCOLUMN 参数中的列必须是维度(dimensions).BUCKETCOLUMN 参数不能是度量(measure)或度量和维度的组合。
示例:
CREATE TABLE IF NOT EXISTS productSchema.productSalesTable (
productNumber INT,
saleQuantity INT,
productName STRING,
storeCity STRING,
storeProvince STRING,
productCategory STRING,
productBatch STRING,
revenue INT)
STORED BY 'carbondata'
TBLPROPERTIES ('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='productName')
SEGMENT MANAGEMENT
SHOW SEGMENT
这个命令用于列出 CarbonData 表的段(segments)。
SHOW [HISTORY] SEGMENTS FOR TABLE [db_name.]table_name LIMIT number_of_segments
示例: 显示可见的 segments
SHOW SEGMENTS FOR TABLE CarbonDatabase.CarbonTable LIMIT 4
显示所有的 segments, 包括不可见的 segments
SHOW HISTORY SEGMENTS FOR TABLE CarbonDatabase.CarbonTable LIMIT 4
DELETE SEGMENT BY ID
这个命令可以根据 segment ID 来删除 segment。每个 segment 都有一个 segment ID 与其关联。使用这个 segment ID 你就可以删除这个 segment 了。
下面命令可以获取 segment ID.
SHOW SEGMENTS FOR TABLE [db_name.]table_name LIMIT number_of_segments
在你检索到要删除的 segment 的 segment ID后,执行以下命令删除选定的 segment。
DELETE FROM TABLE [db_name.]table_name WHERE SEGMENT.ID IN (segment_id1, segments_id2, ...)
示例:
DELETE FROM TABLE CarbonDatabase.CarbonTable WHERE SEGMENT.ID IN (0)
DELETE FROM TABLE CarbonDatabase.CarbonTable WHERE SEGMENT.ID IN (0,5,8)
DELETE SEGMENT BY DATE
该命令根据用户在 DML 命令中提供的日期从存储中删除 CarbonData segment(s)。在特定日期之前创建的 segment 将从存储中删除。
DELETE FROM TABLE [db_name.]table_name WHERE SEGMENT.STARTTIME BEFORE DATE_VALUE
示例:
DELETE FROM TABLE CarbonDatabase.CarbonTable WHERE SEGMENT.STARTTIME BEFORE '2017-06-01 12:05:06'
QUERY DATA WITH SPECIFIED SEGMENTS
该命令用于在 CarbonScan 期间从指定的 segments 中读取数据。
获取 Segment ID:
SHOW SEGMENTS FOR TABLE [db_name.]table_name LIMIT number_of_segments
给表设置 segment IDs
SET carbon.input.segments.<database_name>.<table_name> = <list of segment IDs>
注意: carbon.input.segments: 指定需要查询的 segment IDs。这个命令允许你为表指定需要查询的 segments。这时候 CarbonScan 只会从指定的 segments 读取数据。
如果用户想以多线程模式读取 segments,则使用 CarbonSession。 threadSet 可以用来代替 SET 查询。
CarbonSession.threadSet ("carbon.input.segments.<database_name>.<table_name>","<list of segment IDs>");
重置 segment IDs
SET carbon.input.segments.<database_name>.<table_name> = *;
如果用户想以多线程模式读取 segments,则使用 CarbonSession。 threadSet 可以用来代替 SET 查询。
CarbonSession.threadSet ("carbon.input.segments.<database_name>.<table_name>","*");
示例:
- 下面例子显示 segment ID 列表,segment 状态和其他详细信息,然后指定要读取的 segment 列表。
SHOW SEGMENTS FOR carbontable1;
SET carbon.input.segments.db.carbontable1 = 1,3,9;
- 以多线程模式查询 segments 的示例:
CarbonSession.threadSet ("carbon.input.segments.db.carbontable_Multi_Thread","1,3");
- 多线程环境中的线程集示例(以下显示了如何在 Scala 代码中使用):
def main(args: Array[String]) {
Future {
CarbonSession.threadSet ("carbon.input.segments.db.carbontable_Multi_Thread","1")
spark.sql("select count(empno) from carbon.input.segments.db.carbontable_Multi_Thread").show();
}
}