Timbo Site

write something


ClickHouse与超小型数据平台

ClickHouse是一个非常强大的OLAP数据库,初次接触时被它强劲的性能震撼到了,我将它投入到了正在建立的超小型数据平台中使用,以下是我这半年工作的记录。

开始筹划

超小型数据平台是产品经理在2020年年末的提议,用途是收集数据,将数据结果展示。因此我被产品经理抓了好几个月,让我在2021年过中国春节前把预算做好。当时估了个Hadoop平台的方案,每个数据中心需要安排10+台高配置实例,费用也相当高。

之所以叫超小型数据平台,是因为经过完整评估后觉得数据量不大,2021年年底一天数据量只有几千万左右。需求大部分是T+1报表,这个数据量上Hadoop平台有点大材小用,我需要寻找更简单的数据仓库、数据分析工具。

中国春节后回来发现了ClickHouse,在机器上一个Docker拉起来,照着文档找了几十亿数据塞进去,无论怎么COUNT、DISTINCT都是瞬间返回结果,全表几十亿数据求和不到1秒返回结果;复杂操作,如两表JOIN,还会有进度条给出预估结束时间和内存占用。我大受震撼,翻了很长时间的文档进行学习,仔细评估之后,我定制了如下的配置:

用途 配置 数量
应用 2C4G 2
Kafka 4C8G 3
ClickHouse 8C16G 1

机器数量是之前计划的一半,配置不需要很高,预算砍了一半之后又再砍了一半。

ClickHouse的硬件要求写得很清楚:

ClickHouse implements parallel data processing and uses all the hardware resources available. When choosing a processor, take into account that ClickHouse works more efficiently at configurations with a large number of cores but a lower clock rate than at configurations with fewer cores and a higher clock rate. For example, 16 cores with 2600 MHz is preferable to 8 cores with 3600 MHz.

更多的核心比更高的主频有用,而内存的要求十分迷你,最低4GB内存,如果需要GROUP BY、DISTINCT、JOIN的话就需要更多内存,虽然这些场景都没有,但为了保险起见,我申请了8C16G的配置,避免频繁升级配置。

架构设计

配置和预算都提交申请之后,我开始设计数据结构和数据流。

数据格式

数据上传走的HTTPS接口,数据使用Protobuf V3封装,接口上用Content-Type限制住,还加了RateLimit、时间有效性检查,在无数据加密防护下能够提供较高的接口安全,在服务器端也有较少的CPU消耗。

为了让Protobuf更好压缩数据、减少流量消耗,定制数据格式的时候我将公共字段抽取出来,同类型的数据放在同一个List里,较大体积的数据压缩之后再放入List中,经过验证之后,与同数据量的RAW JSON相比,Protobuf帮助压缩这些数据到只有20-50%大小。

数据流入接口

接口使用了Webflux + Netty接受数据,用1000 Client对应用进行压测,在t3a.micro(2C1G)的实例上跑出了6000QPS的成绩,如果需要更高的QPS,升级配置或者增加机器。

接口收入数据之后,这些数据会按照类型分拆,打上标记,放入特定Kafka Topic中。

这个应用被命名为chainsaw。

数据处理

上游将数据放入Kafka之后,下游在接收到数据之后开始进行处理。

流程有点简单,同样是把数据取出,打上一些标记,然后送入ClickHouse。

这个应用被命名为lumberjack。

报表生成

通过触发定时任务,将必要的数据抽取后送入ClickHouse。再次通过触发定时任务,生成报表。

这个应用被命名为job。

架构图

综上,该架构设计为:

arch

数据经过每一个节点都会被打上一些标签,其中带有经过每个节点的时间,通过这些时间标签,我可以算出数据经过多长时间上传到服务器,经过多长时间处理放入Kafka,在Kafka停留了多长时间开始被处理,又在什么时候落入数据库,从而我可以监测出来数据流动在哪个节点上出现缓慢。

当数据流动出现缓慢时,每个节点可以横向进行扩展缓解:

  • 流量变大出现缓慢,扩chainsaw
  • 消息处理缓慢,扩lumberjack
  • 落库缓慢,ClickHouse升配或扩为集群
  • Kafka不会慢,每秒百万条消息时再考虑
  • 数据量变多时Job会慢,但它有6个小时的宽限期来慢慢处理

此时架构变为:

extend arch

这次设计和成型都十分快,初期的架构设计、代码、部署和投入使用在很短的时间内完成了。

数据仓库

和上述的架构相比,ClickHouse的使用更为重要。

翻ClickHouse文档的时候觉得里面的很多定义和规范很有意思:

  • SQL方言稍微接近MySQL,表名和列名都是大小写敏感,使用多年Postgres的我需要适应
  • 十分重视表DDL,需要指定ORDER BY,隐式指定PRIMARY KEY,可选PARTITION BY指定分区键
    • PARTITION BY还可以指定以函数为键,严格的同时又有稍微放宽的地方,很有趣
  • 指定列为空时,需要使用Nullable配合类型指定
  • 没有Boolean类型:
  • INSERT语句和标准SQL无差异,但:
    • 同时向几个分区INSERT数据会大大降低INSERT性能
      • 应尽可能以大批次INSERT,如一次INSERT 100000条数据
      • 通过一个分区键分组后批量INSERT
    • 性能不会下降的情况:
      • 数据实时INSERT
      • 数据按照时间排序INSERT
  • ……

把ClickHouse的文档翻了很多遍之后,我认为自己建表已经万无一失了。

建表的确是没问题,使用上有问题。数据是一批批上传到服务器,一条条落入数据库。

compact row

每一次INSERT操作会触发一次compact part生成,而1条数据和100条数据的大小并不是1:100,更大可能是1:4。单条数据源源不断流入数据库,磁盘空间占用会先大量增长,在大量compact part被慢慢合并成wide part后,磁盘空间又会慢慢吐出。

disk usage 1

改进应用

增加每次INSERT的数据量

虽然数据实时INSERT不会引起性能下降,但磁盘用量有点夸张,早期忙时会先吃掉30G,闲时合并文件吐出27G,用户量上来之后磁盘占用起起落落就非常巨大。

为了将数据更能一批批而不是一条条落入数据库,lumberjack那边的KafkaListener我开始用batch mode,将一次从Kafka获取数据的时间放长,将这批数据合并送入数据库,尽可能增加一次INSERT的数据量以减少磁盘占用。

代码上从:

@KafkaListener
public void consume(String data) {
  // process
}

改为:

@KafkaListener(containerFactory = "batchFactory")	// define batchFactory befordhand
public void consume(List<String> data) {
  // process
}

部署上线后效果并不明显,看概率:高峰时每秒数据量很多,但并不集中在某一秒或平均到每一秒上,闲时则没什么帮助,但是看着compact part时有20-100多的数据量,心里稍微有点安慰,因为我又节省了这次INSERT产生的90-99%的磁盘空间。

以Kotlin重写应用

进行数据处理时,有时候需要提取attr属性:

{
  "a": {
    "b": {
      "c": {
        "attr": "value"
      }
    }
  }
}

在Java中,将该JSON反序列化回来后,要使用attr,需要进行级联判空,代码类似于:

SomeObj obj = objectMapper.readValue(json, SomeObj.class);
if (obj.getA() != null && obj.getA().getB() != null && obj.getA().getB().getC() != null) {
  // handle attr
}

而Kotlin时中使用safe call,代码类似于:

val a = objectMapper.readValue<SomeObj>(json)
val v: String? = a?.b?.c?.attr
// handle attr

面对成篇判空的代码,换成Kotlin的时候safe call就很干净,加上Kotlin中万物可let/when/apply,代码写起来特别简练,管道式的写法让代码像可插拔的模块一样。于是我用Kotlin将原有项目重写了一遍。

不幸的是,Kotlin并没有想象中的那么完美,如重复注解(如多个@PropertySource)不支持,需要找支持重复注解的其他注解(如@PropertiesSource);注解中字段不是基础类型时需要显式指定类型;要写.class有时候需要动脑想是::class还是::class.java;用Jackson反序列化JSON时objectMapper.readValue<SomeObj>(json)的写法非常直观,但写完之后还要等IDE告诉我有错误,等待import com.fasterxml.jackson.module.kotlin.readValue提示点一下才能好;由于Kotlin类型后置,在IDE进行提示的时候CPU占用都会蹭的一下上去,写得稍微快一点CPU就35-50%的占用率,风扇呼呼转就算了,高速写代码时我还得时不时停一下等IDE缓过神来才能继续往下写。

写完之后上线,CPU占用率从峰值2.5%升高到了4.2%,看性能指标的时候我当时都傻了:

chainsaw cpu usage

因为从Java切到Kotlin时,我把JVM的版本也从15切回8,GC也换掉了,也有可能是JVM的问题而不是Kotlin的问题更不可能是我的问题

chainsaw memory pools

流量翻倍后,目前CPU的峰值为7.3%,性能近似线性;需要新增指标时,代码添加也很简单,只需要在流程中加一行代码,新增提取指标的方法,就好了,维护起来非常方便,往往是语音会议上我一边看着文档一边改代码,会议开完了我也正好能在需求文档上打钩,标记完成。

ClickHouse的特殊用途

后来一些需求都有点古怪,但都正好利用了ClickHouse的一些特性,做得又快又漂亮。

SummingMergeTree和统计

有个统计需求,统计每天的数据来源,但不需要关心每个时段的总和。

ClickHouse中的SummingMergeTree能够自动为相同Primary Key的数据进行聚合,插入同样Primary Key的数据就会自动加一,于是有了这样的一个表:

CREATE TABLE source_stat
(
  Country     String,
  Subdivision String,
  City        String,
  Counter     String,
  CreateTime  DateTime
)	ENGINE = SummingMergeTree(Counter)
	PARTITION BY toYYYYMMDD(CreateTime)
	ORDER BY (Country, Subdivision, City, CreateTime);

这样就不需要跑报表,拉特定时间范围数据就能拉出结果:

Country Subdivision City Counter CreateTime
Albania Berat District Berat 1 2021-06-01 00:00:00
Albania Durrës District Durrës 1 2021-06-01 00:00:00

那如果需要每个时段的总和呢?也很简单,但这个问题要留给各位观众老爷想。

带TTL的表和统计

ClickHouse能够为表指定TTL(Time To Live),数据在表中超过存活时间后会自动删除,可以利用这个特性玩出很多花样。

如有个统计需求,需要统计消息到达用户那边之后,用户一天内会不会反馈,反馈需要多长时间。根据这样的需求,定制两张表:

CREATE TABLE record
(
  Belong_To       String,
  Type            String,
  Interval_Minute UInt64,
  ...
  CreateTime      DateTime
) ENGINE = MergeTree
	PARTITION BY toYYYYMM(CreateTime)
	ORDER BY CreateTime;
CREATE TABLE record_tmp
(
  Belong_To   String,
  Type        String,
  ...
  CreateTime  DateTime
) ENGINE = MergeTree
	PARTITION BY toYYYYMM(CreateTime)
	ORDER BY CreateTime
	TTL CreateTime + toIntervalDay(1);

消息发送时,先在record_tmp表中记录,消息确认之后,在record_tmp表中查找并计算Interval_Minute,接着落入record表中成为事实。

实际具体需求比这个要复杂一些,但同样需要看使用者的表现。

数据展示和报表统计

很早之前看到个说法,记录了下来:

moments

但我使用Superset的时候,发现Superset是真心难用我组除了我都会用Superset,前几天我们组刚把Superset升级到最新版本,实际上就是我菜

比如他自带的driver对ClickHouse的支持很差,用时发现不支持LowCardinality(String):works wrong for specific clickhouse type LowCardinality(String) #58,需要手动升级;不支持sankey loop diagram,转化率和跳转图做得不够好,只能用sankey diagram做单向图,但看代码又有sankey loop diagram支持…我就搞了好长时间viz.py和viz_sip38.py往里面添加功能,结果性能很差,每次查的时候都能把浏览器搞爆,我就跑去用Grafana了

Grafana很好用,ClickHouse的查询从来没出现过奇怪的Bug,可以做这样高精度且清晰的图,可精细到街道:

region traffic map

还有很方便的Copy和Duplicate功能,只要做好一个图表,直接复制粘贴改改里面的值就好了,开发效率很快。

当然也不是一帆风顺,我们一起做Dashboard时经常发现某个图表没问题,升级后有问题,再升级就没问题,或是反过来:如我们在今年7月21日,在用7.X版本的饼图,一切正常,升级到8.0.0版本之后,比较少的数据量就会导致整个饼图数据不正常,再升级到8.0.6,就好了,但是其他图又挂了。实际上就是升级了版本之后出问题,如果不升就不会有问题。

后来我们碰到Grafana问题解决思路是:

  • 有没有新版本:
    • 有:升级,看问题还存不存在
      • 有:去Issues列表看看有没有人有同样问题,他们怎么解决
        • 有:根据他们的办法解决
        • 没有:等新版本或上RC版本,只要你敢出我就敢用
      • 没有:好耶
    • 没有:去Issues列表看看有没有人有同样问题,他们怎么解决
      • 有:根据他们的办法解决
      • 没有:等新版本或上RC版本,只要你敢出我就敢用

也有解决不了的问题,比如现在我们在用Grafana 8.1.3版本,有个Bar Gauge从8.0.0开始就一直不正常:

bug bar gauge

问题根源是:Panel无法为Max Value设置一个合理的值。将Max Value手动设置为65000时,这个图表就正常了:

bar gauge

但是我统计时间范围只要一扩大,排名第一的数值超过65000,那这个图表又开始不正常了…看了一眼Grafana出了8.2.2版本,明天再升级看看

ClickHouse明明很强大但过度谨慎

前面提到了ClickHouse的磁盘占用问题,按照上面数据增长图表来看,在中国的春节期间,所有数据中心ClickHouse的磁盘正好吃满,而我在进行ALTER TABLE DELETE操作的时候,发现表空间竟然是缓慢释放…急的时候真的是不行,还得想办法解决这个问题。

好在ClickHouse也有RENAME,上面的架构正好提供任意时段维护ClickHouse的手段:

  1. 将lumberjack应用关掉,此时不会有数据流入ClickHouse
  2. lumberjack应用关闭不影响chainsaw接受数据,因为数据被缓冲在Kafka里
  3. 维护好ClickHouse之后,打开lumberjack消费Kafka中的数据
  4. 避开job定时任务开跑的时间

重新整理数据表,则可以将原表数据导入到新表:

  1. 如需要整理A表,则可以新建一个同构的A_TMP表
  2. 将A表数据导入A_TMP表
  3. 将A表RENAME为A_OLD
  4. 将A_TMP表RENAME为A表
  5. 将A_OLD表删掉

通过这样的指令,将大部分磁盘占用归还回来。

rename to free

心得

写这文章发现了没开启ClickHouse的async insert,开启这个选项能够让ClickHouse帮忙合并数据减少INSERT次数,学无止境,常学常新。

文章构思于2021年10月29日凌晨0点到3点,晚上ClickHouse的群组置顶了一条信息,让我回想起使用ClickHouse的经历,真是一段快乐的时光。

clickhouse raise