WuKong System

(9 mins to read)

global.hpp

全局配置变量。

config.hpp

配置设置。

type.hpp

  • tripe_t: spo
  • triple_attr_t: sav

string_server.hpp

string <-> int map

rdma.hpp

mem.hpp

一个全局预分配的自行划分的内存空间。

store/vertex.hpp

  • ikey_t: dir (1 bit) + t/pid (17 bits) + vid (46 bits)

与传统的graph转kvstore的key为点,value为边+邻居不同,这里其实是把点和它上面的边作为key,而只把邻居作为value。

predicate index就是把边(predicate)当作一个点,而type index就是对type标签的边再特殊处理一下,把实际的type当作一个点。作为两个索引。

具体地说,t/pid = 0表示value为predicate index;t/pid = 1表示value为type index;vid = 0表示value为normal index。

这样,可以通过<vid + 0 + 0/1>拿到vid的入边和出边,通过<vid + 1 + 1>拿到vid的类型,通过<0 + tid + 0>拿到特定类型的点,通过<0 + pid + 0/1>拿到有特定标签的边的点。

  • iptr_t:用来做动态图快照的

  • vertex_t:key + ptr

  • edge_t:只有邻居的编号

store/cache.hpp

一个带过期时间的LFU,在通过key拿value的时候,如果在cache里就不需要再通过RDMA读了。

  • lookup
  • insert

store/gstore.hpp

具体细节在drtm-sosp15这篇论文中。

如果vid是0,则肯定是predicate/type index,在每个machine上都有replicate,所以直接从本地拿,否则根据differentiate partition(其实实现就是一个取模哈希,roundrobin)看是本地还是远程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
edge_t *get_edges(int tid, sid_t vid, sid_t pid, dir_t d, uint64_t &sz,
int &type = *(int *)NULL) {
// index vertex should be 0 and always local
if (vid == 0)
return get_edges_local(tid, 0, pid, d, sz);

// normal vertex
if (wukong::math::hash_mod(vid, Global::num_servers) == sid)
return get_edges_local(tid, vid, pid, d, sz, type);
else
return get_edges_remote(tid, vid, pid, d, sz, type);
}

// Get local edges according to given vid, pid, d.
// @sz: size of return edges
edge_t *get_edges_local(int tid, sid_t vid, sid_t pid, dir_t d, uint64_t &sz,
int &type = *(int *)NULL) {
ikey_t key = ikey_t(vid, pid, d);
vertex_t v = get_vertex_local(tid, key);

if (v.key.is_empty()) {
sz = 0;
return NULL; // not found
}

// local edges
edge_t *edge_ptr = &(edges[v.ptr.off]);

sz = v.ptr.size;
if (&type != NULL)
type = v.ptr.type;
return edge_ptr;
}
// Get remote edges according to given vid, pid, d.
// @sz: size of return edges
edge_t *get_edges_remote(int tid, sid_t vid, sid_t pid, dir_t d, uint64_t &sz,
int &type = *(int *)NULL) {
ikey_t key = ikey_t(vid, pid, d);
vertex_t v = get_vertex_remote(tid, key);

if (v.key.is_empty()) {
sz = 0;
return NULL; // not found
}

// remote edges
int dst_sid = wukong::math::hash_mod(vid, Global::num_servers);
edge_t *edge_ptr = rdma_get_edges(tid, dst_sid, v);
while (!edge_is_valid(v, edge_ptr)) { // check cache validation
// invalidate cache and try again
rdma_cache.invalidate(key);
v = get_vertex_remote(tid, key);
edge_ptr = rdma_get_edges(tid, dst_sid, v);
}

sz = v.ptr.size;
if (&type != NULL)
type = v.ptr.type;
return edge_ptr;
}

store/static_gstore.hpp

  • insert_triples
  • insert_attr

store/meta.hpp

bind.hpp

console.hpp

comm/tcp_adaptor.hpp

基于ZMQ的一对一通信。

comm/tcp_broadcast.hpp

基于ZMQ的一对多通信。

mm/malloc_interface.hpp

loader/loader_interface.hpp

loader/posix_loader.hpp

query.hpp

dgraph.hpp

wukong.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void *engine_thread(void *arg)
{
Engine *engine = (Engine *)arg;
if (enable_binding && core_bindings.count(engine->tid) != 0)
bind_to_core(core_bindings[engine->tid]);
else
bind_to_core(default_bindings[engine->tid % num_cores]);

engine->run();
}
void *proxy_thread(void *arg)
{
Proxy *proxy = (Proxy *)arg;
if (enable_binding && core_bindings.count(proxy->tid) != 0)
bind_to_core(core_bindings[proxy->tid]);
else
bind_to_core(default_bindings[proxy->tid % num_cores]);

// run the builtin console
run_console(proxy);
}
// launch all proxies and engines
pthread_t *threads = new pthread_t[Global::num_threads];
for (int tid = 0; tid < Global::num_proxies + Global::num_engines; tid++) {
// TID: proxy = [0, #proxies), engine = [#proxies, #proxies + #engines)
if (tid < Global::num_proxies)
pthread_create(&(threads[tid]), NULL, proxy_thread,
(void *)proxies[tid]);
else
pthread_create(&(threads[tid]), NULL, engine_thread,
(void *)engines[tid - Global::num_proxies]);
}