MyException - 我的异常网
当前位置:我的异常网» 编程 » 内存共享实现聊天室程序(附带一个有关问题求解)

内存共享实现聊天室程序(附带一个有关问题求解)

www.MyException.Cn  网友分享于:2013-10-17  浏览:10次
内存共享实现聊天室程序(附带一个问题求解)

     聊天室程序中至少要求每个用户的发言能立即呈现给其它用户,为了提高效率,每个用户连接在服务端都对应一个子进程处理该用户连接。所有用户的发言数据记录在一个用户共享内存中,假设A用户发言了那么共享内存中某段数据t对应A的发言数据,用户B对应的子进程是pid_b处理用户B,那么pid_b只要到到共享内存位置t读取A的发言数据并发送给B,则聊天室逻辑就成立了。为了达到该设计需求,服务端主进程监听端口遇见有用户连接请求就fork一个子进程处理该用户连接。子进程收到其对应的用户发言数据了就通过管道告诉主进程我(这个子进程)有话要说,此时主进程将这些话传送给其它子进程要求这些子进程将这些话发送给它们对用的客户。而这些发言数据通过共享内存实现只需要传递些指针即可,且每个子进程只需要向其对应的位置写数据所以不会出现交叉写的竞态。

服务端程序:(文章最后有个问题希望有人能指教)

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>

#define USER_LIMIT 5//最大客户数
#define BUFFER_SIZE 1024//缓冲区
#define FD_LIMIT 65535//最大支持文件数目
#define MAX_EVENT_NUMBER 1024//最大事件数
#define PROCESS_LIMIT 65536//最大子进程数

struct client_data//客户数据
{
    sockaddr_in address;//客户端地址
    int connfd;//客户连接描述符
    pid_t pid;//处理该连接的子进程号
    int pipefd[2];//管道描述符用户主进程向子进程之间传递数据是socketpair全双工管道,pipefd[0]用于主进程向子进程写入数据,pipefd[1]用于子进程监听主进程是否有数据发送到来
};

static const char* shm_name = "/my_shm";
int sig_pipefd[2];//传递信号值的管道(用于将信号事件与IO事件统一监听)
int epollfd;//事件表描述符
int listenfd;//服务端监听端口
int shmfd;//共享内存标示符
char* share_mem = 0;//共享内存地址
client_data* users = 0;//客户数据数组
int* sub_process = 0;//每个子进程pid对应处理的那个客户编号,采用pid作为数组索引下标得到客户编号
int user_count = 0;//当前连接用户
bool stop_child = false;//终止子进程

int setnonblocking( int fd )//将文件描述符设置为非阻塞
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}

void addfd( int epollfd, int fd )//向注册表添加新的可读事件
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );
}

void sig_handler( int sig )//信号处理函数
{
    int save_errno = errno;
    int msg = sig;
    send( sig_pipefd[1], ( char* )&msg, 1, 0 );//通过管道sig_pipefd向主进程发送信号值
    errno = save_errno;
}

void addsig( int sig, void(*handler)(int), bool restart = true )//安装信号
{
    struct sigaction sa;
    memset( &sa, '\0', sizeof( sa ) );
    sa.sa_handler = handler;
    if( restart )
    {
        sa.sa_flags |= SA_RESTART;//是否重启由于信号中断的系统调用
    }
    sigfillset( &sa.sa_mask );
    assert( sigaction( sig, &sa, NULL ) != -1 );
}

void del_resource()//清理资源
{
    close( sig_pipefd[0] );
    close( sig_pipefd[1] );
    close( listenfd );
    close( epollfd );
    shm_unlink( shm_name );
    delete [] users;
    delete [] sub_process;
}

void child_term_handler( int sig )//设置终止子进程标志
{
    stop_child = true;
}

int run_child( int idx, client_data* users, char* share_mem )//子进程运行逻辑,idx是子进程处理的客户连接描述符,users是所有用户数据数组名,share_mem是共享内存地址
{
    epoll_event events[ MAX_EVENT_NUMBER ];
    int child_epollfd = epoll_create( 5 );
    assert( child_epollfd != -1 );
    int connfd = users[idx].connfd;//获取该子进程对应处理的那个客户连接描述符
    addfd( child_epollfd, connfd );//监听客户连接可读事件(客户端发送数据到来)
    int pipefd = users[idx].pipefd[1];//获取该连接上子进程与主进程通信管道的读端
    addfd( child_epollfd, pipefd );//监听管道可读事件即主进程向子进程发送数据(这个数据是个子进程号pid是处理某个客户连接子进程pid意味着该pid对应的客户发言了,其它客户(本子进程处理的客户就要接收发言)该接收这个发言)
    int ret;
    addsig( SIGTERM, child_term_handler, false );//子进程终止信号安装,就是将子进程终止标志stop_child置为真

    while( !stop_child )
    {
        int number = epoll_wait( child_epollfd, events, MAX_EVENT_NUMBER, -1 );//无限期等待客户连接是否有数据发送到来和主进程是否通过管道发送pid来
        if ( ( number < 0 ) && ( errno != EINTR ) )
        {
            printf( "epoll failure\n" );
            break;
        }

        for ( int i = 0; i < number; i++ )
        {
            int sockfd = events[i].data.fd;
            if( ( sockfd == connfd ) && ( events[i].events & EPOLLIN ) )//若该子进程对应的客户连接有数据发送到来
            {
                memset( share_mem + idx*BUFFER_SIZE, '\0', BUFFER_SIZE );
                ret = recv( connfd, share_mem + idx*BUFFER_SIZE, BUFFER_SIZE-1, 0 );//接收
                if( ret < 0 )
                {
                    if( errno != EAGAIN )//非阻塞EAGAIN非网络出错而是设备当前不可用即数据读完了
                    {
                        stop_child = true;
                    }
                }
                else if( ret == 0 )//关闭连接
                {
                    stop_child = true;
                }
                else
                {
                    send( pipefd, ( char* )&idx, sizeof( idx ), 0 );//通过全双工管道向主进程通知该子进程处理的客户有数据发送到来
                }
            }
            else if( ( sockfd == pipefd ) && ( events[i].events & EPOLLIN ) )//若管道可读则是父进程通知子进程其它某个用户发言了需要将该发言数据发送给子进程对应的客户
            {
                int client = 0;
                ret = recv( sockfd, ( char* )&client, sizeof( client ), 0 );//获取那个用户发言了
                if( ret < 0 )
                {
                    if( errno != EAGAIN )
                    {
                        stop_child = true;
                    }
                }
                else if( ret == 0 )//
                {
                    stop_child = true;
                }
                else
                {
                    send( connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0 );//读取client那个发言用户的数据发送给该子进程对应的客户
                }
            }
            else
            {
                continue;
            }
        }
    }

    close( connfd );
    close( pipefd );
    close( child_epollfd );
    return 0;
}

int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );

    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );

    listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );

    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );

    ret = listen( listenfd, 5 );
    assert( ret != -1 );

    user_count = 0;
    users = new client_data [ USER_LIMIT+1 ];//用户数组
    sub_process = new int [ PROCESS_LIMIT ];//子进程对应的客户编号,便于通过子进程pid作为下标索引到子进程处理的客户连接描述符
    for( int i = 0; i < PROCESS_LIMIT; ++i )
    {
        sub_process[i] = -1;//初始化为不可能的描述符号
    }

    epoll_event events[ MAX_EVENT_NUMBER ];
    epollfd = epoll_create( 5 );//创建主进程事件表
    assert( epollfd != -1 );
    addfd( epollfd, listenfd );//将监听端口加入事件表用于与客户端建立连接

    ret = socketpair( PF_UNIX, SOCK_STREAM, 0, sig_pipefd );//创建管道用于信号处理函数将信号值传给主进程
    assert( ret != -1 );
    setnonblocking( sig_pipefd[1] );
    addfd( epollfd, sig_pipefd[0] );

    addsig( SIGCHLD, sig_handler );//注册信号回调函数
    addsig( SIGTERM, sig_handler );
    addsig( SIGINT, sig_handler );
    addsig( SIGPIPE, SIG_IGN );
    bool stop_server = false;//是否停止服务端
    bool terminate = false;//终止服务端

    shmfd = shm_open( shm_name, O_CREAT | O_RDWR, 0666 );//创建一个共享内存对象
    assert( shmfd != -1 );
    ret = ftruncate( shmfd, USER_LIMIT * BUFFER_SIZE ); //设置共享内存大小
    assert( ret != -1 );

    share_mem = (char*)mmap( NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0 );//将共享内存映射到share_mem中
    assert( share_mem != MAP_FAILED );
    close( shmfd );//关闭共享标识符

    while( !stop_server )
    {
        int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );//主进程等待客户端连接事件和子进程发送某个用户发言事件
        if ( ( number < 0 ) && ( errno != EINTR ) )
        {
            printf( "epoll failure\n" );
            break;
        }

        for ( int i = 0; i < number; i++ )
        {
            int sockfd = events[i].data.fd;//获取事件对应的描述符
            if( sockfd == listenfd )//监听端口事件表明有客户端请求建立连接
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                if ( connfd < 0 )
                {
                    printf( "errno is: %d\n", errno );
                    continue;
                }
                if( user_count >= USER_LIMIT )//客户连接已满拒绝连接
                {
                    const char* info = "too many users\n";
                    printf( "%s", info );
                    send( connfd, info, strlen( info ), 0 );
                    close( connfd );
                    continue;
                }
                users[user_count].address = client_address;//初始化客户连接数据
                users[user_count].connfd = connfd;
                ret = socketpair( PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd );//用于该客户连接对应的子进程与主进程通信
                assert( ret != -1 );
                pid_t pid = fork();
                if( pid < 0 )
                {
                    close( connfd );
                    continue;
                }
                else if( pid == 0 )//子进程
                {
                    close( epollfd );//关闭继承来的描述符
                    close( listenfd );
                    close( users[user_count].pipefd[0] );
                    close( sig_pipefd[0] );
                    close( sig_pipefd[1] );
                    run_child( user_count, users, share_mem );//运行子进程逻辑
                    munmap( (void*)share_mem,  USER_LIMIT * BUFFER_SIZE );//释放共享内存
                    exit( 0 );
                }
                else
                {
                    close( connfd );//父进程不需要这个客户连接描述符了
                    close( users[user_count].pipefd[1] );//关闭pipefd主进程中的写端
                    addfd( epollfd, users[user_count].pipefd[0] );//将pipefd读端加入事件侦听,若侦听到事件表明某个子进程收到客户发来的数据主进程此时要将该数据发送给其它用户(聊天室所有发言所有用户可见)
                    users[user_count].pid = pid;
                    sub_process[pid] = user_count;
                    user_count++;//用户数增加
                }
            }
            else if( ( sockfd == sig_pipefd[0] ) && ( events[i].events & EPOLLIN ) )//信号管道有事件就绪
            {
                int sig;
                char signals[1024];
                ret = recv( sig_pipefd[0], signals, sizeof( signals ), 0 );//信号值为1B所以每个字节是一个信号
                if( ret == -1 )
                {
                    continue;
                }
                else if( ret == 0 )
                {
                    continue;
                }
                else
                {
                    for( int i = 0; i < ret; ++i )//挨个处理信号值
                    {
                        switch( signals[i] )
                        {
                            case SIGCHLD:
                            {
	                        pid_t pid;
	                        int stat;
	                        while ( ( pid = waitpid( -1, &stat, WNOHANG ) ) > 0 )
                                {
                                    int del_user = sub_process[pid];
                                    sub_process[pid] = -1;
                                    if( ( del_user < 0 ) || ( del_user > USER_LIMIT ) )
                                    {
                                        printf( "the deleted user was not change\n" );
                                        continue;
                                    }
                                    epoll_ctl( epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0 );//删除注册的事件
                                    close( users[del_user].pipefd[0] );
                                    users[del_user] = users[--user_count];
                                    sub_process[users[del_user].pid] = del_user;
                                    printf( "child %d exit, now we have %d users\n", del_user, user_count ); 
                                }
                                if( terminate && user_count == 0 )
                                {
                                    stop_server = true;
                                }
                                break;
                            }
                            case SIGTERM:
                            case SIGINT:
                            {
                                printf( "kill all the clild now\n" );
                                //addsig( SIGTERM, SIG_IGN );
                                //addsig( SIGINT, SIG_IGN );
                                if( user_count == 0 )
                                {
                                    stop_server = true;
                                    break;
                                }
                                for( int i = 0; i < user_count; ++i )
                                {
                                    int pid = users[i].pid;
                                    kill( pid, SIGTERM );//杀死子进程
                                }
                                terminate = true;
                                break;
                            }
                            default:
                            {
                                break;
                            }
                        }
                    }
                }
            }
            else if( events[i].events & EPOLLIN )//某个子进程向父进程发送通知客户有发言
            {
                int child = 0;
                ret = recv( sockfd, ( char* )&child, sizeof( child ), 0 );//接收子进程号
                printf( "read data from child accross pipe\n" );
                if( ret == -1 )
                {
                    continue;
                }
                else if( ret == 0 )
                {
                    continue;
                }
                else
                {
                    for( int j = 0; j < user_count; ++j )//除去该子进程child,其余的子进程都要向这些子进程对应的客户发送child对应用户发言的数据
                    {
                        if( users[j].pipefd[0] != sockfd )
                        {
                            printf( "send data to child accross pipe\n" );
                            send( users[j].pipefd[0], ( char* )&child, sizeof( child ), 0 );//向子进程通知发送数据
                        }
                    }
                }
            }
        }
    }

    del_resource();
    return 0;
}

假设有主进程,用户A对应子进程pid1,用户B对应子进程pid2...,现在A用户发言了,主进程需要将A对应的pid1发送给其它子进程以让其它用户接收到A的发言数据,这个过程需要主进程将pid1写到到pid2管道的0端即pipefd_2[0](这里假设pid2与主进程通信管道为pipefd_2),假设在写这个管道之前发生了如下情形:用户B发言了,且将自己的pid2写到管道pipefd_2[1]通知主进程B发言了,此时回到主进程向pipefd_2[0]写pid1....这个过程对于管道数据是否会出现覆盖?不知道全双工管道是如何设计的?

文章评论

如何区分一个程序员是“老手“还是“新手“?
如何区分一个程序员是“老手“还是“新手“?
要嫁就嫁程序猿—钱多话少死的早
要嫁就嫁程序猿—钱多话少死的早
鲜为人知的编程真相
鲜为人知的编程真相
Java 与 .NET 的平台发展之争
Java 与 .NET 的平台发展之争
每天工作4小时的程序员
每天工作4小时的程序员
“懒”出效率是程序员的美德
“懒”出效率是程序员的美德
团队中“技术大拿”并非越多越好
团队中“技术大拿”并非越多越好
那些争议最大的编程观点
那些争议最大的编程观点
5款最佳正则表达式编辑调试器
5款最佳正则表达式编辑调试器
10个调试和排错的小建议
10个调试和排错的小建议
程序员最害怕的5件事 你中招了吗?
程序员最害怕的5件事 你中招了吗?
程序员眼里IE浏览器是什么样的
程序员眼里IE浏览器是什么样的
亲爱的项目经理,我恨你
亲爱的项目经理,我恨你
Web开发人员为什么越来越懒了?
Web开发人员为什么越来越懒了?
 程序员的样子
程序员的样子
什么才是优秀的用户界面设计
什么才是优秀的用户界面设计
写给自己也写给你 自己到底该何去何从
写给自己也写给你 自己到底该何去何从
程序员的一天:一寸光阴一寸金
程序员的一天:一寸光阴一寸金
程序员应该关注的一些事儿
程序员应该关注的一些事儿
当下全球最炙手可热的八位少年创业者
当下全球最炙手可热的八位少年创业者
代码女神横空出世
代码女神横空出世
程序员和编码员之间的区别
程序员和编码员之间的区别
初级 vs 高级开发者 哪个性价比更高?
初级 vs 高级开发者 哪个性价比更高?
为什么程序员都是夜猫子
为什么程序员都是夜猫子
做程序猿的老婆应该注意的一些事情
做程序猿的老婆应该注意的一些事情
中美印日四国程序员比较
中美印日四国程序员比较
程序员的鄙视链
程序员的鄙视链
老程序员的下场
老程序员的下场
我是如何打败拖延症的
我是如何打败拖延症的
老美怎么看待阿里赴美上市
老美怎么看待阿里赴美上市
编程语言是女人
编程语言是女人
程序员周末都喜欢做什么?
程序员周末都喜欢做什么?
60个开发者不容错过的免费资源库
60个开发者不容错过的免费资源库
总结2014中国互联网十大段子
总结2014中国互联网十大段子
聊聊HTTPS和SSL/TLS协议
聊聊HTTPS和SSL/TLS协议
科技史上最臭名昭著的13大罪犯
科技史上最臭名昭著的13大罪犯
看13位CEO、创始人和高管如何提高工作效率
看13位CEO、创始人和高管如何提高工作效率
我的丈夫是个程序员
我的丈夫是个程序员
十大编程算法助程序员走上高手之路
十大编程算法助程序员走上高手之路
Web开发者需具备的8个好习惯
Web开发者需具备的8个好习惯
程序员都该阅读的书
程序员都该阅读的书
“肮脏的”IT工作排行榜
“肮脏的”IT工作排行榜
一个程序员的时间管理
一个程序员的时间管理
程序员必看的十大电影
程序员必看的十大电影
我跳槽是因为他们的显示器更大
我跳槽是因为他们的显示器更大
软件开发程序错误异常ExceptionCopyright © 2009-2015 MyException 版权所有