0%

OpenTSDB 查询优化

OpenTSDB 简介

OpenTSDB是一个基于HBase的可扩展时间序列数据库,支持数百万每秒的读写,与OpenTSDB的交互主要是通过运行一个或多个TSD来实现的。每个TSD是独立的,没有主设备,没有共享状态,因此您可以根据需要运行任意数量的 TSD,以处理向其施加的任何负载。

Cloudera Express管理并运行了生产环境的 HDFS/HBase/ZooKeeper。

项目背景

OpenTSDB存储了SDN设备间的流量数据,粒度包括:源IP、目的IP、上下行流量、服务类型、协议等。

流量统计接口按时间区间直接查询了OpenTSDB的原始数据,未经Rollup处理。

查询瓶颈

客户BJ联通反馈某台SDN设备11月初的流量统计接口返回超时。测试发现其他设备流量统计正常,联通设备查询时候有如下错误日志:

1
2
3
4
16:14:13.466 ERROR [RegionClient.exceptionCaught] - Unexpected exception from downstream on [id: 0xbbc6970f, /172.31.250.10:50790 => /172.31.120.131:16020]
com.stumbleupon.async.CallbackOverflowError: Too many callbacks in Deferred@1273979129(state=PENDING, result=null, callback=net.opentsdb.tsd.HttpJsonSerializer$1DPsResolver@682fd571 -> net.opentsdb.tsd.HttpJsonSerializer$1DPsResolver@3061522a -> net.opentsdb.tsd.HttpJsonSerializer$1DPsResolver@27fac471 -> net.opentsdb.tsd.HttpJsonSerializer$1DPsResolver@17a290df -> net.opentsdb.tsd.HttpJsonSerializer$1DPsResolver@1304b07 ->
//....
passthrough -> passthrough) (size=16383) when attempting to add cb=net.opentsdb.tsd.HttpJsonSerializer$1DPsResolver@29bbd6fc@700176124, eb=passthrough@1260931085

Github上有人2016年就遇到该错误,CallbackOverflowError with many tag values,issue至今未关闭

翻阅了OpenTSDB报错代码段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// com.stumbleupon:async:1.4.1
public final class Deferred<T> {

private static final short MAX_CALLBACK_CHAIN_LENGTH = (1 << 14) - 1;

//....
//com.stumbleupon.async.Deferred#addCallbacks
public <R, R2, E> Deferred<R> addCallbacks(final Callback<R, T> cb,
final Callback<R2, E> eb) {
//...

else if (last_callback == callbacks.length) {
final int oldlen = callbacks.length;
if (oldlen == MAX_CALLBACK_CHAIN_LENGTH * 2) {
throw new CallbackOverflowError("Too many callbacks in " + this
+ " (size=" + (oldlen / 2) + ") when attempting to add cb="
+ cb + '@' + cb.hashCode() + ", eb=" + eb + '@' + eb.hashCode());
}
final int len = Math.min(oldlen * 2, MAX_CALLBACK_CHAIN_LENGTH * 2);
final Callback[] newcbs = new Callback[len];
System.arraycopy(callbacks, next_callback, // Outstanding callbacks.
newcbs, 0, // Move them to the beginning.
last_callback - next_callback); // Number of items.
last_callback -= next_callback;
next_callback = 0;
callbacks = newcbs;
}

//...
}
}

MAX_CALLBACK_CHAIN_LENGTH限制了Deferred链的长度,而进入到上面addCallbacks方法的为聚合分类后的每条数据,估计联通的问题就是处理数据超过了该限制。

测试也发现缩短联通设备的查询时间区间是能正常返回的,并且2019-11-06 8:00:00 - 2019-11-06 9:00:00的数据就近1.5W条返回,也就是说联通设备目的IP近1.5W个,大多数IP流量不多。

Deferred类是Java库提供一些有用的构建模块去 构建高性能,多线程,异步的java应用。它的实现灵感来自Twisted的异步库(twisted.internet.defer)。
Deferred允许你轻松地构建异步的处理链,这个处理链必须触发,当一个异步的事件(I/O,RPC 以及其它)完成。它能被广泛用于构建异步API,在一个多线程服务器或者是客户端中。

着手解决

OpenTSDB并不提供返回结果限制参数,至少2.4版本没有。

华为云CloudTable集成的OpenTSDB服务支持了结果限制,也就是query api提供了一个limit参数。

流量统计接口是统计该设备的TOP10流量,因此设置更大的MAX_CALLBACK_CHAIN_LENGTH没有意义:返回结果多,查询慢。

解决办法就是过滤进入到addCallback的数据大小:

1、OpenTSDB的查询最后准备返回结果的时候才通过三行代码过滤掉不在该时间区间的数据,

1
2
3
4
5
6
7
8
9
//net/opentsdb/tsd/HttpJsonSerializer.java:835
for (final DataPoint dp : dps) {
if (dp.timestamp() < data_query.startTime() ||
dp.timestamp() > data_query.endTime()) {
continue;
}

//...
}

因此一个优化就是在进入addCallback就过滤掉这一部分数据。

2、提供一个limit参数,限制返回结果集大小。这里分两步:一是进入addCallback前通过TreeSet自动排序,限制最多limit条数据会进入处理,二是OutputStream写入时候,排序后截取前limit条数据返回。

关键是提前过滤不在时间区间的数据点及TreeSet自动排序过滤掉大部分小流量的IP数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class HttpJsonSerializer extends HttpSerializer {

//net.opentsdb.tsd.HttpJsonSerializer#formatQueryAsyncV1
public Deferred<ChannelBuffer> formatQueryAsyncV1(final TSQuery data_query,
final List<DataPoints[]> results, final List<Annotation> globals)
throws IOException {

//...

final Deferred<Object> cb_chain = new Deferred<Object>();
TreeSet<Number> numberTreeSet = LongStream.range(0 - limit, 0).mapToDouble(i -> 1.0 * i).boxed().collect(Collectors.toCollection(TreeSet::new));

for (DataPoints[] separate_dps : results) {
for (DataPoints dps : separate_dps) {
try {
//filter empty dps, laudukang
boolean isDpsOk = false;

for (final DataPoint dp : dps) {
if (dp.timestamp() >= data_query.startTime() &&
dp.timestamp() <= data_query.endTime()) {
isDpsOk = true;
break;
}
}

if (isDpsOk && limit > 0) {
Number dpsSum = dps.sumDps();
if (numberTreeSet.first().doubleValue() < dpsSum.doubleValue()) {
numberTreeSet.pollFirst();
numberTreeSet.add(dpsSum.doubleValue());
} else {
//filter
isDpsOk = false;
}
}

if (isDpsOk) {
cb_chain.addCallback(new DPsResolver(dps));
}
} catch (Exception e) {
throw new RuntimeException("Unexpected error durring resolution", e);
}
}
}

//...
}
}

如query api传入参数包含limit,则先将数据写入到ByteArrayOutputStream(这样子源代码改动小),再从ByteArrayOutputStream中反序列化获得原响应数据,倒序并截取前limit条数据写入到OutputStream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//net.opentsdb.tsd.HttpJsonSerializer#sortDps
@SuppressWarnings("unchecked")
private void sortDps(long limit, ByteArrayOutputStream stream, OutputStream output) throws IOException {
byte[] content = stream.toByteArray();
TypeReference<List<Map<String, Object>>> typeReference = new TypeReference<List<Map<String, Object>>>() {
};

List<Map<String, Object>> dataList = JSON.parseToObject(content, typeReference);

List<Map<String, Object>> resultMapList = dataList.stream()
.filter(m -> !m.containsKey("metric"))
.collect(Collectors.toList());

List<Map<String, Object>> metricNodeMapList = dataList.stream()
.filter(m -> m.containsKey("metric"))
.collect(Collectors.toList());

//sum dps for sort
metricNodeMapList.forEach(m -> {
double value = ((Map<String, Object>) m.get("dps")).values().stream()
.mapToDouble(i -> {
if (i instanceof Integer) {
return ((Integer) i).doubleValue();
}
return (double) i;
})
.sum();

m.put("sumDps", value);
});

Comparator<Map<String, Object>> dpsSorter = Comparator.comparingDouble(m -> (double) m.get("sumDps"));
List<Map<String, Object>> metricLimitNodeMapList = metricNodeMapList.stream()
.sorted(dpsSorter.reversed()).limit(limit)
.collect(Collectors.toList());

resultMapList.addAll(metricLimitNodeMapList);


final JsonGenerator json = JSON.getFactory().createGenerator(output);
json.writeObject(resultMapList);
}

编译测试

参考OpenTSDB General Development Compiles OpenTSDB and generates a Debian package,卸载官方版本后安装。

修改 laudukang/opentsdb-okhttp-client 让其支持limit查询参数。

测试显示查询耗时明显缩短,而且响应大小也减少了,不再返回几MB的json数据。

查询例子
POST /api/query?summary

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
{
"limit": 10,//限制返回条目数,倒序
"delete": false,
"end": 1573012800000,
"globalAnnotations": false,
"msResolution": false,
"noAnnotations": false,
"queries": [
{
"aggregator": "sum",
"downsample": "0all-sum",
"explicitTags": false,
"filters": [
{
"filter": "*",
"groupBy": true,
"tagk": "dst_ip",
"type": "wildcard"
},
{
"filter": "CONNECTION_OVERSEAS",
"groupBy": false,
"tagk": "service_type",
"type": "literal_or"
}
],
"metric": "1164.conn.total.in",
"preAggregate": false,
"rate": false,
"useMultiGets": false
}
],
"showQuery": false,
"showStats": false,
"showSummary": false,
"showTSUIDs": false,
"start": 1572998400000,
"timezone": "UTC",
"useCalendar": false
}

response

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
[
{
"metric": "1164.conn.total.in",
"tags": {
"protocol": "TCP",
"service_type": "CONNECTION_OVERSEAS",
"dst_ip": "107.167.89.234"
},
"aggregateTags": [
"src_ip"
],
"dps": {
"157296960": 17588304652
},
"sumDps": 17588304652
},
{
"metric": "1164.conn.total.in",
"tags": {
"src_ip": "240.16.1.109",
"protocol": "TCP",
"service_type": "CONNECTION_OVERSEAS",
"dst_ip": "152.199.39.180"
},
"aggregateTags": [],
"dps": {
"157296960": 9804591660
},
"sumDps": 9804591660
},

//余下8条省略...
]

提供一个编译好的安装包:
Debian package: opentsdb-2.4.0_all.deb

Debian package: opentsdb-2.4.0_all.deb Backup Url

1
2
>md5sum opentsdb-2.4.0_all.deb
22c98200d548b04ef8048922c9886cd0 opentsdb-2.4.0_all.deb

推荐一个类库

推荐一个OpenTSDBJava类库OpenTSDB Java Async Client,封装了一下几个方法,开箱即用:

1
2
3
4
5
6
7
8
9
default String buildUrl(String serviceUrl, String postApiEndPoint, ExpectResponse expectResponse)

default String pushDataPointsString(DataPointBuilder builder, ExpectResponse expectResponse)

default String pushQueriesString(QueryBuilder builder, ExpectResponse expectResponse)

default void asyncPushDataPoints(DataPointBuilder builder, ExpectResponse expectResponse, Callback callback)

default QueryResponse pushQueries(QueryBuilder builder, ExpectResponse expectResponse)

Spring boot init OpenTSDBService example:

1
2
3
4
5
6
7
8
9
@Bean
public OpenTSDBService openTSDBService() {
List<String> openTSDBServerList = configProperties.getOpentsdbServer();
if (CollectionUtils.isEmpty(openTSDBServerList)) {
throw new IllegalArgumentException("empty opentsdb server url");
}

return () -> openTSDBServerList;
}

Thanks to JetBrains‘ support

jetbrains_support.png

部分资料