问题复现
- 建表语句如下
CREATE TABLE test ON CLUSTER `{cluster}`
(
`timestamp` DateTime,
`contractid` UInt32,
`userid` UInt32
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/default/test', '{replica}')
PARTITION BY toYYYYMM(timestamp)
ORDER BY (contractid, toDate(timestamp), userid)
SAMPLE BY userid
- 第一次insert
insert into test ( userid ,contractid , timestamp ) values (1,1,'2022-02-02');
- 返回结果是一行记录:
SELECT * FROM test;
┌───────────timestamp─┬─contractid─┬─userid─┐
│ 2022-02-02 00:00:00 │ 1 │ 1 │
└─────────────────────┴────────────┴────────┘
第二次insert
insert into test ( userid ,contractid , timestamp ) values (1,1,'2022-02-02');
返回还是一行:
:) insert into test ( userid ,contractid , timestamp ) values (1,1,'2022-02-02');
INSERT INTO test (userid, contractid, timestamp) FORMAT Values
Query id: 706e2447-95eb-4515-a7b7-cf363512b673
Ok.
1 row in set. Elapsed: 0.056 sec.
dai-MS-7B89 :) select * from test
SELECT *
FROM test
Query id: 3ba7cd7f-4621-4286-8646-79737ec3e763
┌───────────timestamp─┬─contractid─┬─userid─┐
│ 2022-02-02 00:00:00 │ 1 │ 1 │
└─────────────────────┴────────────┴────────┘
1 row in set. Elapsed: 0.030 sec.
两次插入一样的数据的话, clickhouse会做对应的去重操作,这样两次插入只会插入一条数据
如何解决
clickhouse提供了参数控制是否去重的参数insert-deduplicate
set insert_deduplicate=0;
然后重新insert同一行记录,就不会因为去重导致重复插入数据被丢弃了。
insert into test ( userid ,contractid , timestamp ) values (1,1,'2022-02-02');
INSERT INTO test (userid, contractid, timestamp) FORMAT Values
Query id: a8df989b-0b63-4b45-a1b8-22c13b18bf0a
Ok.
1 row in set. Elapsed: 0.070 sec.
dai-MS-7B89 :) select * from test
SELECT *
FROM test
Query id: e077b55e-bfd9-4678-ae46-9fc05714b3f7
┌───────────timestamp─┬─contractid─┬─userid─┐
│ 2022-02-02 00:00:00 │ 1 │ 1 │
└─────────────────────┴────────────┴────────┘
┌───────────timestamp─┬─contractid─┬─userid─┐
│ 2022-02-02 00:00:00 │ 1 │ 1 │
└─────────────────────┴────────────┴────────┘
日志和源码分析
日志分析
2022.05.15 23:32:04.515912 [ 68323 ] {64b40d4f-0d00-4747-9af3-4afb56b6a84b} <Trace> MergedBlockOutputStream: filled checksums 202202_2_2_0 (state Temporary)
2022.05.15 23:32:04.517872 [ 68323 ] {64b40d4f-0d00-4747-9af3-4afb56b6a84b} <Debug> default.test (7d656761-7cd0-4866-a43e-f0e4cea97654) (Replicated OutputStream): Wrote block with ID '202202_8166901380224458449_12408515745921908624', 1 rows
2022.05.15 23:32:04.533981 [ 68323 ] {64b40d4f-0d00-4747-9af3-4afb56b6a84b} <Information> default.test (7d656761-7cd0-4866-a43e-f0e4cea97654) (Replicated OutputStream): Block with ID 202202_8166901380224458449_12408515745921908624 already exists locally as part 202202_0_0_0; ignoring it.
用lldb调试clickhouse
lldb ./clickhouse-server
void ReplicatedMergeTreeSink::consume(Chunk chunk)
{
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
String block_id;
if (deduplicate) // 从上面生成
{
String block_dedup_token; // 生成token
/// We add the hash from the data and partition identifier to deduplication ID.
/// That is, do not insert the same data to the same partition twice.
const String & dedup_token = settings.insert_deduplication_token;
if (!dedup_token.empty())
{
/// multiple blocks can be inserted within the same insert query
/// an ordinal number is added to dedup token to generate a distinctive block id for each block
block_dedup_token = fmt::format("{}_{}", dedup_token, chunk_dedup_seqnum);
++chunk_dedup_seqnum;
}
block_id = temp_part.part->getZeroLevelPartBlockID(block_dedup_token);
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows", block_id, current_block.block.rows());
}
else
{
LOG_DEBUG(log, "Wrote block with {} rows", current_block.block.rows());
}