消息通讯
客户端和服务端的连接建立完毕后,接下来就是如何对通道上的消息进行处理了。
一般都是客户端发起消息请求,服务端对监听到的请求进行处理,并把处理结果发送给客户端。这样一次消息通讯就完成了。
线程池
在消息通讯时我们不希望发送消息或者处理消息的过程影响到我们主线程的运行(有时甚至不想让主线程等待,采用异步通讯,这个暂且不谈),会开辟新的线程来进行消息的处理,可是频繁的消息通讯,每次都启动或销毁线程,开销可能太大。在这种情况下,重用已经启动的线程是一个很好的选择。seafile
也考虑到了这种情况,所以使用了线程池。
searpc-named-pipe-transport.c
SearpcNamedPipeServer* searpc_create_named_pipe_server_with_threadpool (const char *path, int named_pipe_server_thread_pool_size)
{
GError *error = NULL;
SearpcNamedPipeServer *server = g_malloc0(sizeof(SearpcNamedPipeServer));
memcpy(server->path, path, strlen(path) + 1);
server->named_pipe_server_thread_pool = g_thread_pool_new (handle_named_pipe_client_with_threadpool,
NULL,
named_pipe_server_thread_pool_size,
FALSE,
&error);
if (!server->named_pipe_server_thread_pool) {
if (error) {
g_warning ("Falied to create named pipe server thread pool : %s\n", error->message);
g_clear_error (&error);
} else {
g_warning ("Falied to create named pipe server thread pool.\n");
}
g_free (server);
return NULL;
}
return server;
}
g_thread_pool_new
会创建一个新的线程池,handle_named_pipe_client_with_threadpool
为线程池的处理函数。named_pipe_server_thread_pool_size
为线程池的大小。
这样,当我们使用g_thread_pool_push
来对线程池进行调用时,会创建一个新的线程,或者重用一个已创建的、未使用的线程。
python的实现类似:
named_pipe.py
class NamedPipeClient(SearpcClient):
def __init__(self, socket_path, service_name, pool_size=5):
self.socket_path = socket_path
self.service_name = service_name
self.pool_size = pool_size
self._pool = queue.Queue(pool_size)
def _create_transport(self):
transport = NamedPipeTransport(self.socket_path)
transport.connect()
return transport
def _get_transport(self):
try:
transport = self._pool.get(False)
except:
transport = self._create_transport()
return transport
seafile
通过python
的客户端线程池发起请求,c
的服务端对收到的请求通过线程池进行处理。
数据包传输
既然是通讯,最基本的就是数据包的发送和接收了。
读写数据:
searpc-name-pipe-transport.c
// Write "n" bytes to a descriptor.
gssize
pipe_write_n(int fd, const void *vptr, size_t n)
{
size_t nleft;
gssize nwritten;
const char *ptr;
ptr = vptr;
nleft = n;
while (nleft > 0) {
if ( (nwritten = write(fd, ptr, nleft)) <= 0)
{
if (nwritten < 0 && errno == EINTR)
nwritten = 0; /* and call write() again */
else
return(-1); /* error */
}
nleft -= nwritten;
ptr += nwritten;
}
return(n);
}
// Read "n" bytes from a descriptor.
gssize
pipe_read_n(int fd, void *vptr, size_t n)
{
size_t nleft;
gssize nread;
char *ptr;
ptr = vptr;
nleft = n;
while (nleft > 0) {
if ( (nread = read(fd, ptr, nleft)) < 0) {
if (errno == EINTR)
nread = 0; /* and call read() again */
else
return(-1);
} else if (nread == 0)
break; /* EOF */
nleft -= nread;
ptr += nread;
}
return(n - nleft); /* return >= 0 */
}
#else // !defined(WIN32)
gssize pipe_read_n (SearpcNamedPipe fd, void *vptr, size_t n)
{
DWORD bytes_read;
BOOL success = ReadFile(
fd, // handle to pipe
vptr, // buffer to receive data
(DWORD)n, // size of buffer
&bytes_read, // number of bytes read
NULL); // not overlapped I/O
if (!success || bytes_read != (DWORD)n) {
if (GetLastError() == ERROR_BROKEN_PIPE) {
return 0;
}
G_WARNING_WITH_LAST_ERROR("failed to read from pipe");
return -1;
}
return n;
}
gssize pipe_write_n(SearpcNamedPipe fd, const void *vptr, size_t n)
{
DWORD bytes_written;
BOOL success = WriteFile(
fd, // handle to pipe
vptr, // buffer to receive data
(DWORD)n, // size of buffer
&bytes_written, // number of bytes written
NULL); // not overlapped I/O
if (!success || bytes_written != (DWORD)n) {
G_WARNING_WITH_LAST_ERROR("failed to write to named pipe");
return -1;
}
FlushFileBuffers(fd);
return 0;
}
utiils.py
def recvall(fd, total):
remain = total
data = bytearray()
while remain > 0:
try:
new = fd.recv(remain)
except socket.error as e:
raise NetworkError('Failed to read from socket: %s' % e)
n = len(new)
if n <= 0:
raise NetworkError("Failed to read from socket")
else:
data.extend(new)
remain -= n
return bytes(data)
def sendall(fd, data):
total = len(data)
offset = 0
while offset < total:
try:
n = fd.send(data[offset:])
except socket.error as e:
raise NetworkError('Failed to write to socket: %s' % e)
if n <= 0:
raise NetworkError('Failed to write to socket')
else:
offset += n
这没啥好讲的,大部分文件的读写都类似。
定义消息格式
为了方便消息的处理,服务端客户端一般会规定一个双方都认可的消息格式。
searpc-named-pipe-transport.c
char *searpc_named_pipe_send(void *arg, const gchar *fcall_str,
size_t fcall_len, size_t *ret_len)
{
/* g_debug ("searpc_named_pipe_send is called\n"); */
ClientTransportData *data = arg;
SearpcNamedPipeClient *client = data->client;
char *json_str = request_to_json(data->service, fcall_str, fcall_len);
guint32 len = (guint32)strlen(json_str);
if (pipe_write_n(client->pipe_fd, &len, sizeof(guint32)) < 0) {
g_warning("failed to send rpc call: %s\n", strerror(errno));
free (json_str);
return NULL;
}
if (pipe_write_n(client->pipe_fd, json_str, len) < 0) {
g_warning("failed to send rpc call: %s\n", strerror(errno));
free (json_str);
return NULL;
}
free (json_str);
if (pipe_read_n(client->pipe_fd, &len, sizeof(guint32)) < 0) {
g_warning("failed to read rpc response: %s\n", strerror(errno));
return NULL;
}
char *buf = g_malloc(len);
if (pipe_read_n(client->pipe_fd, buf, len) < 0) {
g_warning("failed to read rpc response: %s\n", strerror(errno));
g_free (buf);
return NULL;
}
*ret_len = len;
return buf;
}
named_pipe.py
class NamedPipeTransport(SearpcTransport):
"""
The protocol is:
- request: <32b length header><json request>
- response: <32b length header><json response>
"""
def send(self, service, fcall_str):
body = json.dumps({
'service': service,
'request': fcall_str,
})
body_utf8 = body.encode(encoding='utf-8')
# "I" for unsiged int
header = struct.pack('=I', len(body_utf8))
sendall(self.pipe, header)
sendall(self.pipe, body_utf8)
resp_header = recvall(self.pipe, 4)
# logger.info('resp_header is %s', resp_header)
resp_size, = struct.unpack('=I', resp_header)
# logger.info('resp_size is %s', resp_size)
resp = recvall(self.pipe, resp_size)
# logger.info('resp is %s', resp)
return resp.decode(encoding='utf-8')
可以看到一次消息的发送,分为两个数据包。 第一个数据包的大小固定,为32位4个字节,用来存放实际数据的大小。 第二个数据包为实际上要发送的数据。
发送请求
SearpcClient*
searpc_client_with_named_pipe_transport(SearpcNamedPipeClient *pipe_client,
const char *service)
{
SearpcClient *client= searpc_client_new();
client->send = searpc_named_pipe_send;
ClientTransportData *data = g_malloc(sizeof(ClientTransportData));
data->client = pipe_client;
data->service = g_strdup(service);
client->arg = data;
return client;
}
将searpc_named_pipe_send
函数绑定到客户端
,后续客户端通过调用本身的send
函数进行消息发送。
python类似:
named_pipe.py
class NamedPipeClient(SearpcClient):
def __init__(self, socket_path, service_name, pool_size=5):
self.socket_path = socket_path
self.service_name = service_name
self.pool_size = pool_size
self._pool = queue.Queue(pool_size)
def _create_transport(self):
transport = NamedPipeTransport(self.socket_path)
transport.connect()
return transport
def _get_transport(self):
try:
transport = self._pool.get(False)
except:
transport = self._create_transport()
return transport
def _return_transport(self, transport):
try:
self._pool.put(transport, False)
except queue.Full:
transport.stop()
def call_remote_func_sync(self, fcall_str):
transport = self._get_transport()
ret_str = transport.send(self.service_name, fcall_str)
self._return_transport(transport)
return ret_str
消息处理
消息处理主要靠线程池创建时绑定的处理函数:
named-pipe-transport.c
static void* handle_named_pipe_client_with_thread(void *arg)
{
named_pipe_client_handler(arg);
return NULL;
}
static void handle_named_pipe_client_with_threadpool(void *data, void *user_data)
{
named_pipe_client_handler(data);
}
static void named_pipe_client_handler(void *data)
{
ServerHandlerData *handler_data = data;
SearpcNamedPipe connfd = handler_data->connfd;
guint32 len;
guint32 bufsize = 4096;
char *buf = g_malloc(bufsize);
g_message ("start to serve on pipe client\n");
while (1) {
len = 0;
if (pipe_read_n(connfd, &len, sizeof(guint32)) < 0) {
g_warning("failed to read rpc request size: %s\n", strerror(errno));
break;
}
if (len == 0) {
/* g_debug("EOF reached, pipe connection lost"); */
break;
}
while (bufsize < len) {
bufsize *= 2;
buf = realloc(buf, bufsize);
}
if (pipe_read_n(connfd, buf, len) < 0 || len == 0) {
g_warning("failed to read rpc request: %s\n", strerror(errno));
break;
}
char *service, *body;
if (request_from_json (buf, len, &service, &body) < 0) {
break;
}
gsize ret_len;
char *ret_str = searpc_server_call_function (service, body, strlen(body), &ret_len);
g_free (service);
g_free (body);
len = (guint32)ret_len;
if (pipe_write_n(connfd, &len, sizeof(guint32)) < 0) {
g_warning("failed to send rpc response(%s): %s\n", ret_str, strerror(errno));
g_free (ret_str);
break;
}
if (pipe_write_n(connfd, ret_str, ret_len) < 0) {
g_warning("failed to send rpc response: %s\n", strerror(errno));
g_free (ret_str);
break;
}
g_free (ret_str);
}
#if !defined(WIN32)
close(connfd);
#else // !defined(WIN32)
DisconnectNamedPipe(connfd);
CloseHandle(connfd);
#endif // !defined(WIN32)
g_free (data);
g_free (buf);
}
服务端监听到有消息收到时,先读取4个字节,得到具体数据的大小,再通过数据的大小,得到具体的请求,服务端对得到的请求进行处理,最后将处理的结果返回给客户端。
python
类似:
named_pipe.py
class PipeHandlerThread(Thread):
def __init__(self, pipe):
Thread.__init__(self)
self.setDaemon(True)
self.pipe = pipe
def run(self):
while True:
req_header = recvall(self.pipe, 4)
# logger.info('Got req header %s', req_header)
req_size, = struct.unpack('I', req_header)
# logger.info('req size is %s', req_size)
req = recvall(self.pipe, req_size)
# logger.info('req is %s', req)
data = json.loads(req.decode(encoding='utf-8'))
resp = searpc_server.call_function(data['service'], data['request'])
# logger.info('resp is %s', resp)
resp_header = struct.pack('I', len(resp))
sendall(self.pipe, resp_header)
sendall(self.pipe, resp.encode(encoding='utf-8'))
从建立连接,到消息的发送,再到消息得到的结果返回,一次完整的通信就完成了。