Server
初始化
在使用服务之前,必须得先对服务所需的一些结构进行初始化。
searpc-server.c
void
searpc_server_init (RegisterMarshalFunc register_func)
{
marshal_table = g_hash_table_new_full (g_str_hash, g_str_equal,
NULL, (GDestroyNotify)marshal_item_free);
service_table = g_hash_table_new_full (g_str_hash, g_str_equal,
NULL, (GDestroyNotify)service_free);
register_func ();
}
可以看到这里涉及到marshal_table
、service_table
以及register_func
(也就是RegisterMarshalFunc
)。
marshal_table
和service_table
是两块hash
表:
searpc-server.c
static GHashTable *marshal_table;
static GHashTable *service_table;
而RegisterMarshalFunc
可以理解为数据编解码的注册函数,事实上在已有的代码里是搜不到它的实现的,在下一章节会详细介绍。
searpc-server.c
typedef void (*RegisterMarshalFunc) (void);
服务创建
先来看看service_table
这个hash
表的主要作用。
libsearpc
通过服务名创建出一个SearpcService
并将其注册到service_table
这个hash
表中,后续可通过服务名在hash
表中找到我们对应的服务。
searpc-server.c
typedef struct {
char *name;
GHashTable *func_table;
} SearpcService;
...
int
searpc_create_service (const char *svc_name)
{
SearpcService *service;
if (!svc_name)
return -1;
if (g_hash_table_lookup (service_table, svc_name) != NULL)
return 0;
service = g_new0 (SearpcService, 1);
service->name = g_strdup(svc_name);
service->func_table = g_hash_table_new_full (g_str_hash, g_str_equal,
NULL, (GDestroyNotify)func_item_free);
g_hash_table_insert (service_table, service->name, service);
return 0;
}
在SearpcService
结构中存储了服务名以及存在要调用的函数hash
表。
也就是在service_table
表中存储了SearpcService
服务的指针,其key
是服务名。
编解码函数注册
再看看marshal_table
这个hash
表的主要作用:
searpc-server.c
gboolean
searpc_server_register_marshal (gchar *signature, SearpcMarshalFunc marshal)
{
MarshalItem *mitem;
g_assert (signature != NULL && marshal != NULL);
if (g_hash_table_lookup (marshal_table, signature) != NULL) {
g_warning ("[Sea RPC] cannot register duplicate marshal.\n");
g_free (signature);
return FALSE;
}
mitem = g_new0 (MarshalItem, 1);
mitem->mfunc = marshal;
mitem->signature = signature;
g_hash_table_insert (marshal_table, (gpointer)mitem->signature, mitem);
return TRUE;
}
可以看到在marshal_table
表中存储了MarshalItem
对象,其有两个成员变量mfunc
和signature
。
signature
是它的签名,也是在hash
表中寻找这个对象的key
;
mfunc
是实际的编解码函数,用来编解码客户端发来信息,后面会详细介绍。
函数注册
那么这两个hash
表有没有什么联系呢?
在创建服务将其注册到service_table
时,同时也初始化了服务的函数hash表,先来看看这个函数表是如何使用的:
searpc-server.c
gboolean
searpc_server_register_function (const char *svc_name,
void *func, const gchar *fname, gchar *signature)
{
SearpcService *service;
FuncItem *item;
MarshalItem *mitem;
g_assert (svc_name != NULL && func != NULL && fname != NULL && signature != NULL);
service = g_hash_table_lookup (service_table, svc_name);
if (!service)
return FALSE;
mitem = g_hash_table_lookup (marshal_table, signature);
if (!mitem) {
g_free (signature);
return FALSE;
}
item = g_new0 (FuncItem, 1);
item->marshal = mitem;
item->fname = g_strdup(fname);
item->func = func;
g_hash_table_insert (service->func_table, (gpointer)item->fname, item);
g_free (signature);
return TRUE;
}
- 首先通过服务名在
service_table
找到对应的服务,然后通过签名在marshal_table
找到与函数对应的编解码对象; - 然后生成一个函数对象
FuncItem *item
, 其包括编marshal
(编解码对象)、fname(函数名)
和func
(处理函数指针)三个成员变量; - 最后将函数对象插入到服务的函数
hash
表中。
这样的话,service_table
和marshal_table
的关系大概也说清了。
服务启动
基本的服务初始化完毕,包括处理函数和编解码函数都注册完成后,就可以启动服务了。
服务启动实际上是在通信过程中进行的,在上一章节也有介绍。
其实也就是在searpc-named-pipe-transport.c/h
中创建服务实例对象,在searrpc-named-pipe-transport.c/h
中启动服务对象。
searpc-named-pipe-transport.c
int searpc_named_pipe_server_start(SearpcNamedPipeServer *server)
{
#if !defined(WIN32)
int pipe_fd = socket (AF_UNIX, SOCK_STREAM, 0);
const char *un_path = server->path;
if (pipe_fd < 0) {
g_warning ("Failed to create unix socket fd : %s\n",
strerror(errno));
return -1;
}
struct sockaddr_un saddr;
saddr.sun_family = AF_UNIX;
if (strlen(server->path) > sizeof(saddr.sun_path)-1) {
g_warning ("Unix socket path %s is too long."
"Please set or modify UNIX_SOCKET option in ccnet.conf.\n",
un_path);
goto failed;
}
if (g_file_test (un_path, G_FILE_TEST_EXISTS)) {
g_message ("socket file exists, delete it anyway\n");
if (g_unlink (un_path) < 0) {
g_warning ("delete socket file failed : %s\n", strerror(errno));
goto failed;
}
}
g_strlcpy (saddr.sun_path, un_path, sizeof(saddr.sun_path));
if (bind(pipe_fd, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
g_warning ("failed to bind unix socket fd to %s : %s\n",
un_path, strerror(errno));
goto failed;
}
if (listen(pipe_fd, 10) < 0) {
g_warning ("failed to listen to unix socket: %s\n", strerror(errno));
goto failed;
}
if (chmod(un_path, 0700) < 0) {
g_warning ("failed to set permisson for unix socket %s: %s\n",
un_path, strerror(errno));
goto failed;
}
server->pipe_fd = pipe_fd;
#endif // !defined(WIN32)
/* TODO: use glib thread pool */
pthread_create(&server->listener_thread, NULL, named_pipe_listen, server);
return 0;
#if !defined(WIN32)
failed:
close(pipe_fd);
return -1;
#endif
}
到这服务的建立及监听都已完成,接下来就是对监听到的客户端消息进行处理。
函数调用
接下来看看如何对消息进行处理。
在上章节通信中,服务端收到客户端消息后,首先将消息解析,之后丢给了searpc_server_call_function
函数处理:
named-pipe-transport.c
static void named_pipe_client_handler(void *data)
{
...
char *service, *body;
if (request_from_json (buf, len, &service, &body) < 0) {
break;
}
gsize ret_len;
char *ret_str = searpc_server_call_function (service, body, strlen(body), &ret_len);
g_free (service);
g_free (body);
...
}
在searpc_server_call_function
函数中,通过服务名service
找到服务端对应的服务,然后对消息体body
进行处理,最后将结果返回。
searpc-server.c
/* Called by RPC transport. */
char*
searpc_server_call_function (const char *svc_name,
gchar *func, gsize len, gsize *ret_len)
{
SearpcService *service;
json_t *array;
char* ret;
json_error_t jerror;
GError *error = NULL;
#ifdef __linux__
struct timeval start, end, intv;
if (slow_log_fp) {
gettimeofday(&start, NULL);
}
#endif
service = g_hash_table_lookup (service_table, svc_name); // 通过服务名找到对应的服务
if (!service) {
char buf[256];
snprintf (buf, 255, "cannot find service %s.", svc_name);
return error_to_json (501, buf, ret_len);
}
array = json_loadb (func, len, 0 ,&jerror); // 将客户端发送的请求数据json化
if (!array) {
char buf[512];
setjetoge(&jerror,&error);
snprintf (buf, 511, "failed to load RPC call: %s\n", error->message);
json_decref (array);
g_error_free(error);
return error_to_json (511, buf, ret_len);
}
const char *fname = json_string_value (json_array_get(array, 0)); // json化的第一条数据为函数名
FuncItem *fitem = g_hash_table_lookup(service->func_table, fname); // 通过函数名找到对应的函数对象
if (!fitem) {
char buf[256];
snprintf (buf, 255, "cannot find function %s.", fname);
json_decref (array);
return error_to_json (500, buf, ret_len);
}
ret = fitem->marshal->mfunc (fitem->func, array, ret_len);
#ifdef __linux__
if (slow_log_fp) {
gettimeofday(&end, NULL);
timersub(&end, &start, &intv);
print_slow_log_if_necessary (svc_name, func, len, &start, &intv);
}
#endif
json_decref(array);
return ret;
}
最终由
ret = fitem->marshal->mfunc (fitem->func, array, ret_len);
处理完成。
而由前面指定,mfunc
是一个SearpcMarshalFunc
函数指针。
其参数为注册的函数指针,函数指针将要处理的数据,处理结果的大小。
searpc-server.h
typedef gchar* (*SearpcMarshalFunc) (void *func, json_t *param_array,
gsize *ret_len);
实际上SearpcMarshalFunc
函数的主要逻辑就是将待处理的数据解码成我们能够处理的格式,然后交给处理函数func进行处理,具体的将在下一章节介绍。
结语
到这里searpc-server.c/h
文件中的主要逻辑基本都介绍完毕了。