通信协议

googlegRpc底层的通信协议基于标准的HTTP/2设计,那么librpc是基于什么协议呢?

由于seafile的服务通常都是打包安装到同一台服务器上,所以其基于的是IPC进程间通信。

具体实现方式,其在linux/mac系统上使用的unix domain socket通讯。

而在Windows系统中使用的是name_pipe命名管道通讯。

unix domain socket

来看看searpc针对cpython是怎么分别实现的。

searpc-named-pipe-transport.c

首先是searpc-named-pipe-transport.c文件:

searpc-named-pipe-transport.c

int searpc_named_pipe_server_start(SearpcNamedPipeServer *server)
{
#if !defined(WIN32)
    int pipe_fd = socket (AF_UNIX, SOCK_STREAM, 0);
    const char *un_path = server->path;
    if (pipe_fd < 0) {
        g_warning ("Failed to create unix socket fd : %s\n",
                   strerror(errno));
        return -1;
    }

    struct sockaddr_un saddr;
    saddr.sun_family = AF_UNIX;

    if (strlen(server->path) > sizeof(saddr.sun_path)-1) {
        g_warning ("Unix socket path %s is too long."
                       "Please set or modify UNIX_SOCKET option in ccnet.conf.\n",
                       un_path);
        goto failed;
    }

    if (g_file_test (un_path, G_FILE_TEST_EXISTS)) {
        g_message ("socket file exists, delete it anyway\n");
        if (g_unlink (un_path) < 0) {
            g_warning ("delete socket file failed : %s\n", strerror(errno));
            goto failed;
        }
    }

    g_strlcpy (saddr.sun_path, un_path, sizeof(saddr.sun_path));
    if (bind(pipe_fd, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
        g_warning ("failed to bind unix socket fd to %s : %s\n",
                      un_path, strerror(errno));
        goto failed;
    }

    if (listen(pipe_fd, 10) < 0) {
        g_warning ("failed to listen to unix socket: %s\n", strerror(errno));
        goto failed;
    }

    if (chmod(un_path, 0700) < 0) {
        g_warning ("failed to set permisson for unix socket %s: %s\n",
                      un_path, strerror(errno));
        goto failed;
    }

    server->pipe_fd = pipe_fd;

#endif // !defined(WIN32)

    /* TODO: use glib thread pool */
    pthread_create(&server->listener_thread, NULL, named_pipe_listen, server);
    return 0;

#if !defined(WIN32)
failed:
    close(pipe_fd);
    return -1;
#endif
}
`

在服务启动时,使用int pipe_fd = socket (AF_UNIX, SOCK_STREAM, 0);生成一个socket描述符,然后将其绑定到一个服务的地址上,服务的地址通常是一个.sock文件。 后续通过对该文件的读写进行通讯。

name_pipe.py

python的实现

name_pipe.py

class NamedPipeServer(object):
    """
    Searpc server based on named pipe transport. Note this server is
    very basic and is written for testing purpose only.
    """
    def __init__(self, socket_path):
        self.socket_path = socket_path
        self.pipe = None
        self.thread = Thread(target=self.accept_loop)
        self.thread.setDaemon(True)

    def start(self):
        self.init_socket()
        self.thread.start()

    def stop(self):
        pass

    def init_socket(self):
        if os.path.exists(self.socket_path):
            try:
                os.unlink(self.socket_path)
            except OSError:
                raise NamedPipeException(
                    'Failed to remove existing unix socket {}'.
                    format(self.socket_path)
                )
        self.pipe = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
        make_socket_closeonexec(self.pipe)
        self.pipe.bind(self.socket_path)
        self.pipe.listen(10)
        logger.info('Server now listening at %s', self.socket_path)

    def accept_loop(self):
        logger.info('Waiting for clients')
        while True:
            connfd, _ = self.pipe.accept()
            logger.info('New pip client')
            t = PipeHandlerThread(connfd)
            t.start()

正如注释所言,python的这段代码并没有在实际项目中执行,只是为了单纯的测试。所以其也没实现Windows的基于命名管道的通讯。

name_pipe

在上面介绍的searpc_named_pipe_server_start函数中,在#if !defined(WIN32)宏下只有非windows系统的unix domain socket通讯,好像并没有看见关于Windowsname_pipe的实现?

继续往下看,searpc_named_pipe_server_start函数在最后创建了一个线程:

pthread_create(&server->listener_thread, NULL, named_pipe_listen, server);

线程的处理函数为name_pipe_listen,函数的参数为server

事实上,name_pipe的实现放在了name_pipe_listen下:

searpc-named-pipe-transport.c

static void* named_pipe_listen(void *arg)
{
    SearpcNamedPipeServer *server = arg;
#if !defined(WIN32)
    while (1) {
        int connfd = accept (server->pipe_fd, NULL, 0);
        ServerHandlerData *data = g_malloc(sizeof(ServerHandlerData));
        data->connfd = connfd;
        if (server->named_pipe_server_thread_pool)
            g_thread_pool_push (server->named_pipe_server_thread_pool, data, NULL);
        else {
            pthread_t handler;
            pthread_attr_t attr;
            pthread_attr_init(&attr);
            pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
            pthread_create(&handler, &attr, handle_named_pipe_client_with_thread, data);
        }
    }

#else // !defined(WIN32)
    while (1) {
        HANDLE connfd = INVALID_HANDLE_VALUE;
        BOOL connected = FALSE;

        connfd = CreateNamedPipe(
            server->path,             // pipe name
            PIPE_ACCESS_DUPLEX,       // read/write access
            PIPE_TYPE_MESSAGE |       // message type pipe
            PIPE_READMODE_MESSAGE |   // message-read mode
            PIPE_WAIT,                // blocking mode
            PIPE_UNLIMITED_INSTANCES, // max. instances
            kPipeBufSize,             // output buffer size
            kPipeBufSize,             // input buffer size
            0,                        // client time-out
            NULL);                    // default security attribute

        if (connfd == INVALID_HANDLE_VALUE) {
            G_WARNING_WITH_LAST_ERROR ("Failed to create named pipe");
            break;
        }

        /* listening on this pipe */
        connected = ConnectNamedPipe(connfd, NULL) ?
            TRUE : (GetLastError() == ERROR_PIPE_CONNECTED);

        if (!connected) {
            G_WARNING_WITH_LAST_ERROR ("failed to ConnectNamedPipe()");
            CloseHandle(connfd);
            break;
        }

        /* g_debug ("Accepted a named pipe client\n"); */

        ServerHandlerData *data = g_malloc(sizeof(ServerHandlerData));
        data->connfd = connfd;
        if (server->named_pipe_server_thread_pool)
            g_thread_pool_push (server->named_pipe_server_thread_pool, data, NULL);
        else {
            pthread_t handler;
            pthread_attr_t attr;
            pthread_attr_init(&attr);
            pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
            pthread_create(&handler, &attr, handle_named_pipe_client_with_thread, data);
        }
    }
#endif // !defined(WIN32)
    return NULL;
}

因为不需要用python生成服务端,所以没有关于name_pipe的python实现。

总结

实际项目中,seafile真正的存储内核seafile-server使用c基于unix domain unixname_pipe协议生成了rpc服务,seafileweb客户端seahub通过对其服务端生成的服务文件描述符进行通信。

Copyright © itrunner.cn 2020 all right reserved,powered by Gitbook该文章修订时间: 2022-08-28 07:44:16

results matching ""

    No results matching ""