Server

初始化

在使用服务之前,必须得先对服务所需的一些结构进行初始化。

searpc-server.c

void
searpc_server_init (RegisterMarshalFunc register_func)
{
    marshal_table = g_hash_table_new_full (g_str_hash, g_str_equal,
                                           NULL, (GDestroyNotify)marshal_item_free);
    service_table = g_hash_table_new_full (g_str_hash, g_str_equal,
                                           NULL, (GDestroyNotify)service_free);

    register_func ();
}

可以看到这里涉及到marshal_tableservice_table以及register_func(也就是RegisterMarshalFunc)。

marshal_tableservice_table是两块hash表:

searpc-server.c

static GHashTable *marshal_table;
static GHashTable *service_table;

RegisterMarshalFunc可以理解为数据编解码的注册函数,事实上在已有的代码里是搜不到它的实现的,在下一章节会详细介绍。

searpc-server.c

typedef void (*RegisterMarshalFunc) (void);

服务创建

先来看看service_table这个hash表的主要作用。

libsearpc通过服务名创建出一个SearpcService并将其注册到service_table这个hash表中,后续可通过服务名在hash表中找到我们对应的服务。

searpc-server.c

typedef struct {
    char *name;
    GHashTable *func_table;
} SearpcService;

...

int
searpc_create_service (const char *svc_name)
{
    SearpcService *service;

    if (!svc_name)
        return -1;

    if (g_hash_table_lookup (service_table, svc_name) != NULL)
        return 0;

    service = g_new0 (SearpcService, 1);
    service->name = g_strdup(svc_name);
    service->func_table = g_hash_table_new_full (g_str_hash, g_str_equal,
                                                 NULL, (GDestroyNotify)func_item_free);

    g_hash_table_insert (service_table, service->name, service);

    return 0;
}

SearpcService结构中存储了服务名以及存在要调用的函数hash表。 也就是在service_table表中存储了SearpcService服务的指针,其key是服务名。

编解码函数注册

再看看marshal_table这个hash表的主要作用:

searpc-server.c

gboolean
searpc_server_register_marshal (gchar *signature, SearpcMarshalFunc marshal)
{
    MarshalItem *mitem;

    g_assert (signature != NULL && marshal != NULL);

    if (g_hash_table_lookup (marshal_table, signature) != NULL) {
        g_warning ("[Sea RPC] cannot register duplicate marshal.\n");
        g_free (signature);
        return FALSE;
    }

    mitem = g_new0 (MarshalItem, 1);
    mitem->mfunc = marshal;
    mitem->signature = signature;
    g_hash_table_insert (marshal_table, (gpointer)mitem->signature, mitem);

    return TRUE;
}

可以看到在marshal_table表中存储了MarshalItem对象,其有两个成员变量mfuncsignaturesignature是它的签名,也是在hash表中寻找这个对象的keymfunc是实际的编解码函数,用来编解码客户端发来信息,后面会详细介绍。

函数注册

那么这两个hash表有没有什么联系呢?

在创建服务将其注册到service_table时,同时也初始化了服务的函数hash表,先来看看这个函数表是如何使用的:

searpc-server.c

gboolean
searpc_server_register_function (const char *svc_name,
                                 void *func, const gchar *fname, gchar *signature)
{
    SearpcService *service;
    FuncItem *item;
    MarshalItem *mitem;

    g_assert (svc_name != NULL && func != NULL && fname != NULL && signature != NULL);

    service = g_hash_table_lookup (service_table, svc_name);
    if (!service)
        return FALSE;

    mitem = g_hash_table_lookup (marshal_table, signature);
    if (!mitem) {
        g_free (signature);
        return FALSE;
    }

    item = g_new0 (FuncItem, 1);
    item->marshal = mitem;
    item->fname = g_strdup(fname);
    item->func = func;

    g_hash_table_insert (service->func_table, (gpointer)item->fname, item);
    g_free (signature);
    return TRUE;
}
  • 首先通过服务名在service_table找到对应的服务,然后通过签名在marshal_table找到与函数对应的编解码对象;
  • 然后生成一个函数对象FuncItem *item, 其包括编marshal(编解码对象)、fname(函数名)func(处理函数指针)三个成员变量;
  • 最后将函数对象插入到服务的函数hash表中。

这样的话,service_tablemarshal_table的关系大概也说清了。

服务启动

基本的服务初始化完毕,包括处理函数和编解码函数都注册完成后,就可以启动服务了。

服务启动实际上是在通信过程中进行的,在上一章节也有介绍。

其实也就是在searpc-named-pipe-transport.c/h中创建服务实例对象,在searrpc-named-pipe-transport.c/h中启动服务对象。

searpc-named-pipe-transport.c

int searpc_named_pipe_server_start(SearpcNamedPipeServer *server)
{
#if !defined(WIN32)
    int pipe_fd = socket (AF_UNIX, SOCK_STREAM, 0);
    const char *un_path = server->path;
    if (pipe_fd < 0) {
        g_warning ("Failed to create unix socket fd : %s\n",
                   strerror(errno));
        return -1;
    }

    struct sockaddr_un saddr;
    saddr.sun_family = AF_UNIX;

    if (strlen(server->path) > sizeof(saddr.sun_path)-1) {
        g_warning ("Unix socket path %s is too long."
                       "Please set or modify UNIX_SOCKET option in ccnet.conf.\n",
                       un_path);
        goto failed;
    }

    if (g_file_test (un_path, G_FILE_TEST_EXISTS)) {
        g_message ("socket file exists, delete it anyway\n");
        if (g_unlink (un_path) < 0) {
            g_warning ("delete socket file failed : %s\n", strerror(errno));
            goto failed;
        }
    }

    g_strlcpy (saddr.sun_path, un_path, sizeof(saddr.sun_path));
    if (bind(pipe_fd, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
        g_warning ("failed to bind unix socket fd to %s : %s\n",
                      un_path, strerror(errno));
        goto failed;
    }

    if (listen(pipe_fd, 10) < 0) {
        g_warning ("failed to listen to unix socket: %s\n", strerror(errno));
        goto failed;
    }

    if (chmod(un_path, 0700) < 0) {
        g_warning ("failed to set permisson for unix socket %s: %s\n",
                      un_path, strerror(errno));
        goto failed;
    }

    server->pipe_fd = pipe_fd;

#endif // !defined(WIN32)

    /* TODO: use glib thread pool */
    pthread_create(&server->listener_thread, NULL, named_pipe_listen, server);
    return 0;

#if !defined(WIN32)
failed:
    close(pipe_fd);
    return -1;
#endif
}

到这服务的建立及监听都已完成,接下来就是对监听到的客户端消息进行处理。

函数调用

接下来看看如何对消息进行处理。

在上章节通信中,服务端收到客户端消息后,首先将消息解析,之后丢给了searpc_server_call_function函数处理:

named-pipe-transport.c

static void named_pipe_client_handler(void *data)
{
    ...
        char *service, *body;
        if (request_from_json (buf, len, &service, &body) < 0) {
            break;
        }

        gsize ret_len;
        char *ret_str = searpc_server_call_function (service, body, strlen(body), &ret_len);
        g_free (service);
        g_free (body);
    ...
}

searpc_server_call_function函数中,通过服务名service找到服务端对应的服务,然后对消息体body进行处理,最后将结果返回。

searpc-server.c

/* Called by RPC transport. */
char*
searpc_server_call_function (const char *svc_name,
                             gchar *func, gsize len, gsize *ret_len)
{
    SearpcService *service;
    json_t *array;
    char* ret;
    json_error_t jerror;
    GError *error = NULL;

#ifdef __linux__
    struct timeval start, end, intv;

    if (slow_log_fp) {
        gettimeofday(&start, NULL);
    }
#endif

    service = g_hash_table_lookup (service_table, svc_name);   // 通过服务名找到对应的服务
    if (!service) {
        char buf[256];
        snprintf (buf, 255, "cannot find service %s.", svc_name);
        return error_to_json (501, buf, ret_len);
    }

    array = json_loadb (func, len, 0 ,&jerror);   // 将客户端发送的请求数据json化

    if (!array) {
        char buf[512];
        setjetoge(&jerror,&error);
        snprintf (buf, 511, "failed to load RPC call: %s\n", error->message);
        json_decref (array);
        g_error_free(error);
        return error_to_json (511, buf, ret_len);
    }

    const char *fname = json_string_value (json_array_get(array, 0));  // json化的第一条数据为函数名
    FuncItem *fitem = g_hash_table_lookup(service->func_table, fname); // 通过函数名找到对应的函数对象
    if (!fitem) {
        char buf[256];
        snprintf (buf, 255, "cannot find function %s.", fname);
        json_decref (array);
        return error_to_json (500, buf, ret_len);
    }

    ret = fitem->marshal->mfunc (fitem->func, array, ret_len);

#ifdef __linux__
    if (slow_log_fp) {
        gettimeofday(&end, NULL);
        timersub(&end, &start, &intv);
        print_slow_log_if_necessary (svc_name, func, len, &start, &intv);
    }
#endif

    json_decref(array);

    return ret;
}

最终由

ret = fitem->marshal->mfunc (fitem->func, array, ret_len);

处理完成。

而由前面指定,mfunc是一个SearpcMarshalFunc函数指针。

其参数为注册的函数指针,函数指针将要处理的数据,处理结果的大小。

searpc-server.h

typedef gchar* (*SearpcMarshalFunc) (void *func, json_t *param_array,
    gsize *ret_len);

实际上SearpcMarshalFunc函数的主要逻辑就是将待处理的数据解码成我们能够处理的格式,然后交给处理函数func进行处理,具体的将在下一章节介绍。

结语

到这里searpc-server.c/h文件中的主要逻辑基本都介绍完毕了。

Copyright © itrunner.cn 2020 all right reserved,powered by Gitbook该文章修订时间: 2024-08-01 14:07:45

results matching ""

    No results matching ""