Skip to content

clickhosue insert insert_deduplicate

Posted on:May 13, 2022 at 07:54 PM

问题复现

CREATE TABLE test ON CLUSTER `{cluster}`
(
    `timestamp` DateTime,
    `contractid` UInt32,
    `userid` UInt32
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/default/test', '{replica}')
PARTITION BY toYYYYMM(timestamp)
ORDER BY (contractid, toDate(timestamp), userid)
SAMPLE BY userid
insert into test ( userid ,contractid ,  timestamp ) values (1,1,'2022-02-02');
SELECT *  FROM test;

┌───────────timestamp─┬─contractid─┬─userid─┐
│ 2022-02-02 00:00:00 │          1 │      1 │
└─────────────────────┴────────────┴────────┘

第二次insert

insert into test ( userid ,contractid ,  timestamp ) values (1,1,'2022-02-02');

返回还是一行:

:) insert into test ( userid ,contractid ,  timestamp ) values (1,1,'2022-02-02');

INSERT INTO test (userid, contractid, timestamp) FORMAT Values

Query id: 706e2447-95eb-4515-a7b7-cf363512b673

Ok.

1 row in set. Elapsed: 0.056 sec. 

dai-MS-7B89 :) select * from test

SELECT *
FROM test

Query id: 3ba7cd7f-4621-4286-8646-79737ec3e763

┌───────────timestamp─┬─contractid─┬─userid─┐
│ 2022-02-02 00:00:00 │          1 │      1 │
└─────────────────────┴────────────┴────────┘

1 row in set. Elapsed: 0.030 sec. 

两次插入一样的数据的话, clickhouse会做对应的去重操作,这样两次插入只会插入一条数据

如何解决

clickhouse提供了参数控制是否去重的参数insert-deduplicate

set insert_deduplicate=0;

然后重新insert同一行记录,就不会因为去重导致重复插入数据被丢弃了。

insert into test ( userid ,contractid ,  timestamp ) values (1,1,'2022-02-02');

INSERT INTO test (userid, contractid, timestamp) FORMAT Values

Query id: a8df989b-0b63-4b45-a1b8-22c13b18bf0a

Ok.

1 row in set. Elapsed: 0.070 sec. 

dai-MS-7B89 :) select * from test

SELECT *
FROM test

Query id: e077b55e-bfd9-4678-ae46-9fc05714b3f7

┌───────────timestamp─┬─contractid─┬─userid─┐
│ 2022-02-02 00:00:00 │          1 │      1 │
└─────────────────────┴────────────┴────────┘
┌───────────timestamp─┬─contractid─┬─userid─┐
│ 2022-02-02 00:00:00 │          1 │      1 │
└─────────────────────┴────────────┴────────┘

日志和源码分析

日志分析

2022.05.15 23:32:04.515912 [ 68323 ] {64b40d4f-0d00-4747-9af3-4afb56b6a84b} <Trace> MergedBlockOutputStream: filled checksums 202202_2_2_0 (state Temporary)
2022.05.15 23:32:04.517872 [ 68323 ] {64b40d4f-0d00-4747-9af3-4afb56b6a84b} <Debug> default.test (7d656761-7cd0-4866-a43e-f0e4cea97654) (Replicated OutputStream): Wrote block with ID '202202_8166901380224458449_12408515745921908624', 1 rows
2022.05.15 23:32:04.533981 [ 68323 ] {64b40d4f-0d00-4747-9af3-4afb56b6a84b} <Information> default.test (7d656761-7cd0-4866-a43e-f0e4cea97654) (Replicated OutputStream): Block with ID 202202_8166901380224458449_12408515745921908624 already exists locally as part 202202_0_0_0; ignoring it.

用lldb调试clickhouse

lldb ./clickhouse-server
void ReplicatedMergeTreeSink::consume(Chunk chunk)
{
    auto block = getHeader().cloneWithColumns(chunk.detachColumns());

        String block_id;

        if (deduplicate)  // 从上面生成
        {
            String block_dedup_token;  // 生成token

            /// We add the hash from the data and partition identifier to deduplication ID.
            /// That is, do not insert the same data to the same partition twice.

            const String & dedup_token = settings.insert_deduplication_token;
            if (!dedup_token.empty())
            {
                /// multiple blocks can be inserted within the same insert query
                /// an ordinal number is added to dedup token to generate a distinctive block id for each block
                block_dedup_token = fmt::format("{}_{}", dedup_token, chunk_dedup_seqnum);
                ++chunk_dedup_seqnum;
            }

            block_id = temp_part.part->getZeroLevelPartBlockID(block_dedup_token);
            LOG_DEBUG(log, "Wrote block with ID '{}', {} rows", block_id, current_block.block.rows());
        }
        else
        {
            LOG_DEBUG(log, "Wrote block with {} rows", current_block.block.rows());
        }

相关阅读