|
[Original]
[Print]
[Top]
|
文件上传与下载的服务器与客户端:
这个程序是我为我下个月去北京打气写的,给自己找点感觉和信心,我花了2天半的时间写的。我简单的介绍一下这个程序的思路,它虽然是个网络相关的,但其实我网络知识并不好,这次是第1次在LINUX用C做SOCKET程序,所以程序可能写的不够健壮。还请大家能提一些宝贵的意见。注:我并没有参考任何代码(包括开源的)。所以大家如果觉得不是垃圾的话可以尽管拿去用。因为时间紧张,可能会存在很多问题,希望大家能多提一下。
服务器文件上传服务和下载服务是分成2个程序写的,可以很容易整合。
文件上传服务:预先建立线程池,线程池里的线程数目可以单调增加,直至最大值。接受服务请求(accept())由主线程来做,把接受来的请求放到一个循环队列里,其是生产者,然后线程池里的其余的线程是消费者。这里我就不解释了。锁和条件变量pthread_mutex_t Uploadfile_clifd_mutex=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t Uploadfile_buff_notFull=PTHREAD_COND_INITIALIZER;
pthread_cond_t Uploadfile_buff_notEmpty=PTHREAD_COND_INITIALIZER; 保证了这一个工作。看代码吧。统计忙线程数是通过那个循环队列计算出来的。
而文件下载服务用了一个完全不同的工作方式,预先建立线程池,线程池里的线程数目可以单调增加,直至最大值。但是各个线程各自accept()。但我并没有选择让所有的线程都阻塞在accept(),那样的话因为所有的线程共享一个监听套接字,那么OS会保证一切顺利,但当有请求到达时会引起“惊群”现象,因为他们睡眠在同一个监听套接字,有请求到达,OS会全部叫醒(但也可能wakeup_one()),一个取走,其余查看被取走后再转去睡眠。
我增加了一个线程锁,让所有线程互斥做accept()。但是这样所有线程阻塞在一个线程锁上和在内核阻塞在一个套接字上哪个更合算呢?如果你知道答案,请务必告诉我。另外在这个因为各个线程各自accept,就会出现了一个问题,就是如何统计系统忙否(线程池里的忙线程数,看其是否需要扩展)。我想出来的解决办法是增加一个记数值及配套的一个锁。每当一个线程accept成功后记数值增一,完了再减一。而对这个记数值的检查工作有主线程来做,安装了SIGALRM的处理函数,然后alarm()设置了闹钟,每隔一段时间(CHECK_DOWNLOAD_SERVER_TIME)检查一次,而当线程池达到最大之后,在最后一次检查就撤消了SIGALRM的处理,也取消了闹钟。
关于数据库这边,我数据库作为了一个编译选项,如果 #define DB_INSTALL了就会编译数据库进去,为了代码容易阅读和修改,还有数据库的无关,我比划着JAVA里常用的几个数据库类的函数名做了代码封装,一个是DB结构体,用来连接,初始化,查询(主要),一个是Result结构体,用来对查询结果的处理。因为这个小程序本身用到的功能不多,所以只是 “按需定义”。
/*******************************
UNISOCK。H:
*********************************/
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/stat.h>
#define DEBUG/*为了显示调试信息定义*/
#ifndef BUFFSIZE
#define BUFFSIZE 1024
#endif
/*Some length info*/
#ifndef USER_NAME_LEN /*用户名的长度*/
#define USER_NAME_LEN 25
#endif
#ifndef PATH_ESCAPE /*系统路径分隔符*/
#define PATH_ESCAPE '/'
#endif
#ifndef USER_PASSWD_LEN/*用户密码长度*/
#define USER_PASSWD_LEN 60
#endif
#ifndef FILENAME_SIZE /*文件名的长度*/
#define FILENAME_SIZE 30
#endif
#ifndef FILESIZE_BYTES
#define FILESIZE_BYTES 20
#endif
/*length info end*/
#ifndef MAX_FILENUMBER_PER_USER /*每个用户最多可以上传的文件数*/
#define MAX_FILENUMBER_PER_USER 25
#endif
/*服务器文件上传和下载服务线程数目可以单调增加,有最大限制*/
#ifndef MAX_UPLOADFILE_THREAD /*服务器最多的文件上传服务线程数目*/
#define MAX_UPLOADFILE_THREAD 100
#endif
#ifndef NORM_UPLOADFILE_THREAD /*服务器正常的文件上传服务线程数目*/
#define NORM_UPLOADFILE_THREAD 50
#endif
#ifndef UPLOAD_FILE_PORT
#define UPLOAD_FILE_PORT 9877
#endif
#ifndef USER_UPLOADFILE_DIR/*用户文件上传目录,可以考虑用支持小文件的文件系统*/
#define USER_UPLOADFILE_DIR "/root/work/upload"
#endif
#ifndef MAX_DOWNLOADFILE_THREAD
#define MAX_DOWNLOADFILE_THREAD 100
#endif
#ifndef NORM_DOWNLOADFILE_THREAD
#define NORM_DOWNLOADFILE_THREAD 50
#endif
#ifndef USER_DOWNLOADFILE_DIR
#define USER_DOWNLOADFILE_DIR USER_UPLOADFILE_DIR
#endif
#ifndef DOWNLOAD_FILE_PORT
#define DOWNLOAD_FILE_PORT 9878
#endif
/*查看下载文件服务线程,看其是否需要增加*/
#ifndef CHECK_DOWNLOAD_SERVER_TIME
#define CHECK_DOWNLOAD_SERVER_TIME 100
#endif
/**The below MACROS can be used by all server functions,eg,Upload and Download*/
/*listen()函数的第2个参数(backlog)*/
#ifndef LISTENMAX
#define LISTENMAX 1024
#endif
/*当前的上传或下载服务线程数目*/
int UPLOADFILE_THREAD_CNT=NORM_UPLOADFILE_THREAD;
int DOWNLOADFILE_THREAD_CNT=NORM_DOWNLOADFILE_THREAD;
int THREAD_NEED_ADD=5;
int THREAD_INCREMENT=20;
/*用户最大可上传文件的大小限制*/
#ifndef MAX_UPLOADFILE_SIZE
#define MAX_UPLOADFILE_SIZE (1<<20)
#endif
/*每个用户有上传空间限制*/
#ifndef ONE_USER_UPLOAD_SPACE
#define ONE_USER_UPLOAD_SPACE (1<<23)
#endif
#define sys_exit_msg(MSG) do{fprintf(stderr,"%s:%s
",MSG,strerror(errno));exit(errno);}while(0)
#define shutdown_msg(MSG) /*test*/;
#define ERR_INFO(MSG) do{fprintf(stederr,MSG);}while(0)
#define INFO_MSG(MSG) ERR_INFO(MSG)
/*锁和条件变量,各对各的用处看代码,前3个是类似生产者消费者的循环队列*/
pthread_mutex_t Uploadfile_clifd_mutex=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t Uploadfile_buff_notFull=PTHREAD_COND_INITIALIZER;
pthread_cond_t Uploadfile_buff_notEmpty=PTHREAD_COND_INITIALIZER;
pthread_mutex_t Downloadfile_clifd_mutex=PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t Downloadfile_BusyThread_CNT=PTHREAD_MUTEX_INITIALIZER;
struct UpThread{
int fd;
char addrstr[INET_ADDRSTRLEN];
};
struct DownThread{
int listenfd;
int *count;
};
/*为了节省敲代码,做了几个包装函数*/
void Pthread_mutex_lock(pthread_mutex_t *arg){
if(pthread_mutex_lock(arg)<0)
sys_exit_msg("MUTEX LOCK ERROR:");
}
void Pthread_mutex_unlock(pthread_mutex_t *arg){
if(pthread_mutex_unlock(arg)<0)
sys_exit_msg("MUTEX UNLOCK ERROR:");
}
void Pthread_cond_signal(pthread_cond_t *arg){
if(pthread_cond_signal(arg)<0)
sys_exit_msg("Condition varible signal error:");
}
void Pthread_cond_wait(pthread_cond_t *arg,pthread_mutex_t *mu){
if(pthread_cond_wait(arg,mu)<0)
sys_exit_msg("Condition varible wait error:");
}
ssize_t readn(int fd,void *buff,size_t n){
ssize_t nleft=n,nread=0;
char *ptr=buff;
while(nleft>0){
if((nread=read(fd,ptr,nleft))<0){
if(errno==EINTR)
nread=0;
else return -1;
}
else if(nread==0)
break;/*File end*/
nleft-=nread;
ptr+=nread;
}
return (n-nleft);
}
ssize_t writen(int fd,const char *vptr,size_t n){
ssize_t nleft=n,nwritten=0;
const char *ptr;
ptr=vptr;
while(nleft>0){
if((nwritten=write(fd,ptr,nleft))<=0){
if(errno==EINTR)
nwritten=0;
else
return -1;
}
nleft-=nwritten;
ptr+=nwritten;
}
return n;
}
int Readn(int fd,void *buff,int n){
int rd=readn(fd,buff,n);
if(rd<0)
sys_exit_msg("readn error:");
return rd;
}
int Writen(int fd,const char *ptr,ssize_t n){
int wt=writen(fd,ptr,n);
if(wt<0)
sys_exit_msg("write error:");
return wt;
}
void Chdir(const char *path){
if(chdir(path)<0)
sys_exit_msg("change to file storage directror error");
}
//#define DB_INSTALL
/*
下面的代码是针对数据库写的,数据库作为了一个编译选项,如果 #define DB_INSTALL了就会编译数据库进去,
下面的代码不是为了偷懒写的,是为了代码容易阅读和修改,还有数据库的无关,我比划着JAVA里常用的几个
类的函数名做了代码封装,一个是DB结构体,用来连接,初始化,查询(主要),一个是Result结构体,用来查询
对结果的处理。因为这个小程序本身用到的功能不多,所以只是 ¥按需定义!
*/
#ifdef DB_INSTALL
#include <mysql.h>
struct RESULT{
MYSQL_RES *result;
MYSQL_ROW row;
int (*isEmpty)(struct RESULT *); /*判空*/
void (*next)(struct RESULT *); /*取一行*/
void (*clean)(struct RESULT *); /*善后处理*/
char *(*GetCurrRowCol)(struct RESULT *,int);/*取当前行的某列*/
};
struct DB{
MYSQL DBconn;
MYSQL_RES *result;
/*Protect the result until it is fetched*/
pthread_mutex_t qlock; /*互斥*/
int (*Query)(struct DB *,const char *,struct RESULT *);
void (*close)(struct DB *);
};
struct DB myDB;
void DB_close(struct DB *db){
mysql_close(&(db->DBconn));
}
typedef struct RESULT Result;
int isResultEmpty(struct RESULT *re){
if((re==NULL) || !mysql_num_rows(re))
return 1;
return 0;
}
void CleanResult(struct RESULT *re){
mysql_free_result(re->result);
}
char *get_col_curr_row(Result *re,int col){
if(mysql_num_fields(re->result)<col)
return NULL;
return ((re->row)[col-1]);
}
void GetNext(struct RESULT *re){
re->row=mysql_fetch_row(re->result);
}
int DatabaseQuery(struct DB *db,const char *qu,Result *res){
Pthread_mutex_lock(&(db->qlock));
if(mysql_query(&(db->DBconn),qu)) {
Pthread_mutex_unlock(&(db->qlock));/*first unlock then return*/
return -1;
}
res->result=mysql_store_result(&(db->DBconn));
Pthread_mutex_unlock(&(db->qlock));
return 0;
}
void Result_init(Result *re){
re->result=NULL;
re->next=GetNext;
re->clean=CleanResult;
re->isEmpty=isResultEmpty;
re->GetCurrRowCol=get_col_curr_row;
}
void DB_init(struct DB *db){
mysql_init(&(db->DBconn));
if(!mysql_real_connect(&(db->DBconn),"localhost","mysql","","test",0,NULL,0))
sys_exit_msg("ERROR in connection Database test:");
db->Query=DatabaseQuery;
db->close=DB_close;
}
#endif
/**************************************************************
SERV01。C:
**************************************************************/
#include "unisock.h"
void UploadFile_Pool_Start(void);
void UploadFile_Thread_Batch_Create(int);
void UploadFile_Thread_Batch_Create(int);
void *uploadfile_servfunc(void *);
/*NOTE:this is shared by ALL threads!!*/
/*
一个类似生产者消费者的队列,主线程统一接受ACCPT(),然后把 FILE DESCRIPTOR 传给其余的线程,此循环队列的大小同
服务器的上传文件服务线程池里线程数目,两者同时单调增加!所以队列不满就有线程IDLE,可以保证把fd取走。
*/
int iget,iput;
struct UpThread *queue_ptr;
int main(int argc,char *argv[]){
int listenfd;
struct sockaddr_in cliaddr,servaddr;
socklen_t addrlen,clilen;
clilen=sizeof(struct sockaddr_in);
if( (listenfd=socket(AF_INET,SOCK_STREAM,0))<0)
sys_exit_msg("Server listen error:");
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family=AF_INET;
servaddr.sin_addr.s_addr=htonl(INADDR_ANY);
servaddr.sin_port=htons(UPLOAD_FILE_PORT);
if(bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr))<0)
sys_exit_msg("SERVER bind error:");
if(listen(listenfd,LISTENMAX)<0)
sys_exit_msg("SERVER listen error occoured:");
/*allocate an array for threads to fetch the already accepted socket'fd,it is a round acyclic queue*/
if((queue_ptr=(struct UpThread *)calloc(NORM_UPLOADFILE_THREAD,sizeof(struct UpThread)))<0)
sys_exit_msg("Init threads'shared queue error:");
iget=0;iput=0;
/*Connection to DB*/
#ifdef DB_INSTALL
DB_init(&myDB);
#endif
UploadFile_Pool_Start();/*线程池建立!*/
/**THe main thread is now on!*/
while(1){
int connectionfd;
Pthread_mutex_lock(&Uploadfile_clifd_mutex);
while((iget+1)%UPLOADFILE_THREAD_CNT==iput)
Pthread_cond_wait(&Uploadfile_buff_notFull,&Uploadfile_clifd_mutex);
if( (connectionfd=accept(listenfd,(struct sockaddr *)&cliaddr,&clilen))<0)
sys_exit_msg("Serv Upload file Main Thread ERROR:");
queue_ptr[iput].fd=connectionfd;
inet_ntop(AF_INET,&cliaddr.sin_addr,&queue_ptr[iput].addrstr,INET_ADDRSTRLEN);/*no use*/
iput=(iput+1)%UPLOADFILE_THREAD_CNT;
/*如果线程池需要扩大(可用线程小于THREAD_NEED_ADD而且线程池未达到最大值)*/
if(((iput-iget+UPLOADFILE_THREAD_CNT)%UPLOADFILE_THREAD_CNT)>=(UPLOADFILE_THREAD_CNT-THREAD_NEED_ADD) &&
UPLOADFILE_THREAD_CNT<MAX_UPLOADFILE_THREAD){
/*Then extend the thread pool*/
/*calculate the number of threads need to add,no more than THREAD_INCREMENT*/
int number=(UPLOADFILE_THREAD_CNT+THREAD_INCREMENT)<MAX_UPLOADFILE_THREAD ? THREAD_INCREMENT:MAX_UPLOADFILE_THREAD-UPLOADFILE_THREAD_CNT;
UploadFile_Thread_Batch_Create(number);
UPLOADFILE_THREAD_CNT+=number; /*update*/
/*you should not only add threads also should get new queue_ptr!!*/
/*NOTE that we are holding mutex now,so do your work quickly*/
if((queue_ptr=(struct UpThread *)realloc(queue_ptr,UPLOADFILE_THREAD_CNT))==NULL)
sys_exit_msg("Some thing which should never happen if you'v calculated carefully happened!!");
}
Pthread_cond_signal(&Uploadfile_buff_notEmpty);
Pthread_mutex_unlock(&Uploadfile_clifd_mutex);
}
return 0; /*never returns!*/
}
void UploadFile_Thread_Batch_Create(int arg){
int i=0;
fprintf(stdout,"Now in UploadFile_Thread_Batech_Create()...
");
fflush(stdout);
for(;i<arg;i++){
pthread_t tid;
if(pthread_create(&tid,NULL,uploadfile_servfunc,NULL)<0)
sys_exit_msg("upload file thread pool start error:");
}
}
void UploadFile_Pool_Start(){
fprintf(stdout,"UploadFile_Pool_Start...
");
fflush(stdout);
UploadFile_Thread_Batch_Create(NORM_UPLOADFILE_THREAD);
}
void do_the_fileupload(struct UpThread);
void *uploadfile_servfunc(void *arg){
struct UpThread conn;
fprintf(stdout,"I am Thread%d in uploadfile_servfunc
",pthread_self());
fflush(stdout);
/*endless loop*/
while(1){
Pthread_mutex_lock(&Uploadfile_clifd_mutex);
while(iget==iput)
Pthread_cond_wait(&Uploadfile_buff_notEmpty,&Uploadfile_clifd_mutex);
conn=queue_ptr[iget];
iget=(iget+1)%UPLOADFILE_THREAD_CNT;
// fprintf(stdout,"iget=%d,iput=%d
",iget,iput);
Pthread_cond_signal(&Uploadfile_buff_notFull);
Pthread_mutex_unlock(&Uploadfile_clifd_mutex);
do_the_fileupload(conn);
}
}
/*This func use the current time and random() to generate a string the new file name stored in the server file system*/
void random_filename(char *ptr,size_t len){
int rand=random();
int curr=time(NULL);
snprintf(ptr,FILENAME_SIZE,"%d%d",curr,rand);
ptr[FILENAME_SIZE-1]=' ';
}
void do_the_fileupload(struct UpThread thread){
/*first read the start 30 bytes as file name,and then 20 bytes as file size*/
int size;
char *buff;
int sockfd=thread.fd;
if((buff=(char *)malloc(BUFFSIZE))==NULL)
sys_exit_msg("Memory tight?:");
fprintf(stdout,"I am Thread%d supply the service
",pthread_self());
if(readn(sockfd,buff,300)<0){ /*300 is enough to contain the infomation*/
#ifdef DEBUG
fprintf(stderr,"ERROR happened in Thread%d when reading the uploader's first Package
",pthread_self());
#endif
free(buff);
close(sockfd);
return ; /*stop serve to this client*/
}
fprintf(stdout,"%s",buff);
buff[USER_NAME_LEN-1]=buff[USER_NAME_LEN+USER_PASSWD_LEN-1]=buff[USER_NAME_LEN+USER_PASSWD_LEN+FILENAME_SIZE-1]=' ';
/*Check the DB to see if this user can upload files!*/
#ifdef DB_INSTALL
char *query=(char *)malloc(300);
if(!query) sys_exit_msg("Memory tight?:");
snprintf(query,300,"select files space_use from user where id=%s AND passwd=%s",buff,buff+USER_NAME_LEN);
char username[USER_NAME_LEN];
strncpy(username,buff,USER_NAME_LEN);
char actual_filename[FILENAME_SIZE];
strncpy(actual_filename,buff+USER_NAME_LEN+USER_PASSWD_LEN);
Result re;
Result_init(&re);
myDB.Query(&myDB,query,&re);
int code=0;
if(re.isEmpty()){
code=1; /*user info invalid!*/
goto CODE_HANDLE;
}
int space=0,files=0;
re.next();
sscanf(re.GetCurrRowCol(0),"%d",&files);
sscanf(re.GetCurrRowCol(1),"%d",&space_use);
if(space>= ONE_USER_UPLOAD_SPACE){
code=2;
goto CODE_HANDLE;
}
if(files>= MAX_FILENUMBER_PER_USER){
code=3;
goto CODE_HANDLE;
}
/*check if there is already exites one file with the same name owned by the user */
snprintf(query,300,"select file_name from user,userfiles where user.id=userfiles.id AND id=%s AND file_name=%s ",buff,buff+USER_NAME_LEN+USER_PASSWD_LEN);
Result_init(&re);
myDB.Query(&myDB,query,&re);
if(!re.isEmpty()){
code=4; /*another file with the same file already exists!*/
goto CODE_HANDLE;
}
CODE_HANDLE:
switch(code){
case 0:
buff[0]='0';
strncpy(buff+1,"EVERYTHING IS OK!",sizeof("EVERYTHING IS OK!"));
break;
case 1:
buff[0]='1';
strncpy(buff+1,"USER INFO INCORRECT!",sizeof("USER INFO INCORRECT!"));
break;
case 2:
buff[0]='2';
strncpy(buff+1,"USER SPACE USE UP!",sizeof("USER SPACE USE UP!"));
break;
case 3:
buff[0]='3';
strncpy(buff+1,"USER FILE NUMBER IS TO THE MAX!",sizeof("USER FILE NUMBER IS TO THE MAX!"));
break;
case 4:
buff[0]='4';
strncpy(buff+1,"ALREADY EXISTS A FILE WITH THE SAME NAME!",sizeof("ALREADY EXISTS A FILE WITH THE SAME NAME!"));
break;
default:
buff[0]='5';
strncpy(buff+1,"SOME ERROR OCCURED!",sizeof("SOME ERROE OCCURED!"));
break;
}
writen(sockfd,buff,300);
if(code){
close(sockfd);
re.clean(&re);
free(query);
return;
}
re.clean(&re);
#endif
#ifndef DB_INSTALL
buff[0]='0';
strncpy(buff+1,"EVERYTHING IS OK!",sizeof("EVERYTHING IS OK!"));
writen(sockfd,buff,300);
#endif
char *filename=NULL;
if((filename=(char*)malloc(FILENAME_SIZE))==NULL)
sys_exit_msg("Memory tight:");
random_filename(filename,FILENAME_SIZE);
Chdir(USER_UPLOADFILE_DIR);
int fd;
if((fd=open(filename,O_CREAT|O_RDWR,S_IRWXU))<0){
free(filename);free(buff);
close(sockfd);
#ifdef DEBUG
sys_exit_msg("Error in creating upload file:");
#endif
return;
}
#ifdef DB_INSTALL
snprintf(query,300,"INSERT INTO userfiles(server_filename,id,filename,size) values(%s,%s,%s,0)",filename,username,actual_filename);
myDB.Query(&myDB,query,&re);
#endif
int rd=0,flen=0;
/*flen check the file length is valid*/
while((rd=readn(sockfd,buff,BUFFSIZE))>0){
if( ( (flen+rd)>MAX_UPLOADFILE_SIZE ) /* || (already_use+rd)>ONE_USER_UPLOAD_SPACE*/)
break;
if(writen(fd,buff,rd)<0)
break; /*Some error happened,stoppting service*/
#ifdef DB_INSTALL
/*The updates should be done in a transaction */
snprintf(query,300,"update userfiles set size=size+%d where id=%s AND file_name=%s",rd,username,actual_filename);
myDB.Query(&myDB,query,&re);
snprintf(query,300,"update user set space_use=space_use+%d where id=%s",rd,username);
myDB.Query(&myDB,query,&re);
#endif
flen+=rd;
}
if(rd<0)
sys_exit_msg("Read error 02:");
close(fd);
close(sockfd);
free(filename);
free(buff);
#ifdef DB_INSTALL
free(query);
#endif
fprintf(stdout,"Thread%d supply over!
",pthread_self());
return;
}
/************************************************************
SERV02。C
***********************************************************/
#include "unisock.h"
void DownloadFile_Pool_Start(void);
void DownloadFile_Thread_Batch_Create(int);
void *downloadfile_servfunc(void *);
void do_the_download_work(int);
void alrm_sig(int);
void (*restore)(int);
/*Global variable shared by ALL threads*/
int downlistenfd=0;
int Downloadfile_BusyThread_count;
int main(int argc,char *argv[]){
struct sockaddr_in servaddr;
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family=AF_INET;
servaddr.sin_addr.s_addr=htonl(INADDR_ANY);
servaddr.sin_port=htons(DOWNLOAD_FILE_PORT);
if((downlistenfd=socket(AF_INET,SOCK_STREAM,0))<0)
sys_exit_msg("SERVER Download file socket() error 01:");
if(bind(downlistenfd,(struct sockaddr *)&servaddr,sizeof(servaddr))<0)
sys_exit_msg("SERVER Download file bind() error 02");
if(listen(downlistenfd,LISTENMAX)<0)
sys_exit_msg("SERVER Download file listen() error 03:");
#ifdef DB_INSTALL
myDB.init(&myDB);
#endif
DownloadFile_Pool_Start();
if( (restore=signal(SIGALRM,alrm_sig))==SIG_ERR)
sys_exit_msg("Install alarm signal handler error:");
alarm(CHECK_DOWNLOAD_SERVER_TIME);
//check the download server is busy or not
while(1);//never stop!
return 0;
}
void DownloadFile_Pool_Start(){
fprintf(stdout,"DownloadFile_Pool_Start...
");
fflush(stdout);
DownloadFile_Thread_Batch_Create(NORM_DOWNLOADFILE_THREAD);
}
void DownloadFile_Thread_Batch_Create(int arg){
int i=0;
fprintf(stdout,"Now in DownloadFile_Thread_Batech_Create()...
");
fflush(stdout);
for(;i<arg;i++){
pthread_t tid;
if(pthread_create(&tid,NULL,downloadfile_servfunc,NULL)<0)
sys_exit_msg("download file thread pool start error:");
}
}
void *downloadfile_servfunc(void *arg){
int acceptfd;
struct sockaddr *cliaddr_ptr=NULL;
socklen_t *clilen_ptr=NULL;
if((cliaddr_ptr=(struct sockaddr*)malloc(sizeof(struct sockaddr)))==NULL)
sys_exit_msg("Memory tight ?:");
if((clilen_ptr=(socklen_t *)malloc(sizeof(socklen_t)))==NULL)
sys_exit_msg("Memory tight ?:");
#ifdef DEBUG
fprintf(stdout,"NOW I am Thread%d entering the downloadfile_servfunc()
",pthread_self());
#endif
while(1){
Pthread_mutex_lock(&Downloadfile_clifd_mutex);
fprintf(stderr,"I am Thread%d get the lock now
",pthread_self());
if((acceptfd=accept(downlistenfd,cliaddr_ptr,clilen_ptr))<0){
fprintf(stderr,"Thread%d accept() error:%s
",pthread_self(),strerror(errno));
Pthread_mutex_unlock(&Downloadfile_clifd_mutex);
continue;
}
Pthread_mutex_unlock(&Downloadfile_clifd_mutex);
#ifdef DEBUG
fprintf(stdout,"Thread%d acceptfd=%d
",pthread_self(),acceptfd); fflush(stdout);
#endif
Pthread_mutex_lock(&Downloadfile_BusyThread_CNT);
Downloadfile_BusyThread_count++;/*忙线程记数值增一*/
Pthread_mutex_unlock(&Downloadfile_BusyThread_CNT);
do_the_download_work(acceptfd);
Pthread_mutex_lock(&Downloadfile_BusyThread_CNT);
Downloadfile_BusyThread_count--;/*忙线程记数值减一*/
Pthread_mutex_unlock(&Downloadfile_BusyThread_CNT);
}
}
void do_the_download_work(int fd){
#ifdef DEBUG
fprintf(stdout,"NOW I am Thread%d entering the do_the_download_work()
",pthread_self());
fflush(stdout);
#endif
char *buff=NULL;
if((buff=(char *)malloc(BUFFSIZE))==NULL)
sys_exit_msg("Memory Tight? :");
/*Read the user's username(length:USER_NAME_LEN) and passwd(len:USER_PASSWD_LEN),filename(len:FILENAME_SIZE)
*/
if(readn(fd,buff,300)<0){/*300 is enough to contain these info*/
/*Some wrong occured,stop service to this client,do the clean up work*/
free(buff);
close(fd);
return;
}
buff[USER_NAME_LEN-1]=buff[USER_NAME_LEN+USER_PASSWD_LEN-1]=buff[USER_NAME_LEN+USER_PASSWD_LEN+FILENAME_SIZE-1]=' ';
puts(buff);
int file;
char *filename=NULL;/*to store the actual filename stored in server file system*/
if((filename=(char *)malloc(255))==NULL) /*255 means the max filename length the server file system allowed*/
sys_exit_msg("Memory Tight?:");
if(check_user_info(buff,buff+USER_NAME_LEN)<0){
buff[0]='1';
strncpy(buff+1,"USER INFO INCORRECT!",sizeof("USER INFO INCORRECT!"));
writen(fd,buff,300);
#ifdef DEBUG
fprintf(stdout,"Checked user info
");
#endif
}
else if( lookup_in_own(buff,buff+USER_NAME_LEN+USER_PASSWD_LEN,filename,255)<0){
buff[0]='2';
strncpy(buff+1,"FILE NAME INCORRECT!",sizeof("FILE NAME INCORRECT!"));
writen(fd,buff,300);
#ifdef DEBUG
fprintf(stdout,"Checked file info invalid
");
#endif
}
else if((file=open(filename,O_RDONLY))<0){
buff[0]='3';
strncpy(buff+1,"FILE OPEN ERROR IN THE SERVER!",sizeof("FILE OPEN ERROR IN THE SERVER!"));
writen(fd,buff,300);
#ifdef DEBUG
fprintf(stdout,"open file error
");
#endif
}
else{
buff[0]='0';
strncpy(buff+1,"Downloading....!",sizeof("Downloading....!"));
writen(fd,buff,300);
#ifdef DEBUG
fprintf(stdout,"Downloading started! Thread%d is serving
",pthread_self());
#endif
}
switch(buff[0]){
case '0':break;
/*fall through!!!!*/
case '1':
case '2':
case '3':
default: /* do the clean up work and return*/
close(file);
close(fd);
free(buff);
free(filename);
return;
}
/*do the right work now!*/
int rd=0;
while((rd=readn(file,buff,BUFFSIZE))>0)
writen(fd,buff,rd);
close(fd);
close(file);
free(buff);
free(filename);
}
void alrm_sig(int num){
fprintf(stdout,"NOW I am Thread%d checking the server,in the alrm signal handler,COUNT=%d
",pthread_self(), Downloadfile_BusyThread_count);
fflush(stdout);
if(Downloadfile_BusyThread_count>( DOWNLOADFILE_THREAD_CNT- THREAD_NEED_ADD) &&
(DOWNLOADFILE_THREAD_CNT< MAX_DOWNLOADFILE_THREAD) ){
int needadd=(DOWNLOADFILE_THREAD_CNT+THREAD_NEED_ADD)<MAX_DOWNLOADFILE_THREAD?THREAD_NEED_ADD:(MAX_DOWNLOADFILE_THREAD-DOWNLOADFILE_THREAD_CNT);
DownloadFile_Thread_Batch_Create(needadd);
alarm(0); /*cancle the check,because the pool is to the extend,and restore the ALARM signal handler*/
if( signal(SIGALRM,restore)==SIG_ERR)
sys_exit_msg("restore alarm signal handler error:");
}else
alarm(CHECK_DOWNLOAD_SERVER_TIME);
}
int check_user_info(const char *name,const char *pass){
/* DB check! */
#ifdef DB_INSTALL
char *query=(char *)malloc(300);
if(!query) sys_exit_msg("Memory tight?:");
snprintf(query,300,"select id from user where id=%s AND passwd=%s",name,pass);
Result re;
Result_init(&re);
myDB.Query(&myDB,query,&re);
if(re.isEmpty()){
free(query);
re.clean(&re);
return -1;
}
re.clean(&re);
free(query);
#endif
return 0;
}
int lookup_in_own(const char *user,const char *name,char *actualfile,int len){
/*DB check*/
#ifdef DB_INSTALL
char *query=(char *)malloc(300);
if(!query) sys_exit_msg("Memory tight?:");
snprintf(query,300,"select server_filename from userfiles where id=%s AND file_name=%s",user,name);
Result re;
Result_init(&re);
myDB.Query(&myDB,query,&re);
if(re.isEmpty()){
re.clean(&re);
free(query); /* file does not existes!*/
return -1;
}
re.next();
char *oo=NULL;
oo=re.GetCurrRowCol(0);
int i=(len-1)>strlen(oo)?strlen(oo):(len-1);
strncpy(actualfile,oo,len);
actualfile[len]=' ';
free(query);
re.clean(&re);
#endif
#ifndef DB_INSTALL
strncpy(actualfile,name,strlen(name)+1);
#endif
return 0;
}
/*****************************************
CLI01。C
*******************************************/
#include "unisock.h"
int path2filename(const char *,char *,int);
int main(int argc,char* argv[]){
int sockfd;
struct sockaddr_in servaddr;
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family=AF_INET;
servaddr.sin_port=htons(UPLOAD_FILE_PORT);
if(argc<=2)
sys_exit_msg("Usage:command servaddr file...");
if(inet_pton(AF_INET,argv[1],&servaddr.sin_addr)<0)
sys_exit_msg("check server address:");
char *buff=NULL;
if((buff=(char *)malloc(BUFFSIZE))==NULL)
sys_exit_msg("Memory tight ? :");
struct stat fs;
int i=0;
char name[USER_NAME_LEN];
char passwd[USER_PASSWD_LEN];
fprintf(stdout,"USER NAME:");fflush(stdout);
if(fgets(name,USER_NAME_LEN,stdin)==NULL)
sys_exit_msg("USER NAME INCORRECT");
name[strlen(name)-1]=' ';
fprintf(stdout,"PASSWD:");fflush(stdout);
fgets(passwd,USER_PASSWD_LEN,stdin);
passwd[strlen(passwd)]=' ';
for(;i<(argc-2);i++){
/*
处理每一个文件,注意此程序只是我练一下手的,所以并没有遵循什么“协议”!I define the protocol。
第一个300字节包括用户名和密码和文件名,然后等待服务器的确认信息(300字节),如果可以,上传。
*/
if((sockfd=socket(AF_INET,SOCK_STREAM,0))<0){
fprintf(stderr,"Socket Error in uploading file(%s):%s
",argv[i+2],strerror(errno));
free(buff); /*no need to free,we exit next*/
exit(errno);
}
//puts("socket ok!");
if(connect(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr))<0){
fprintf(stderr,"Connection to the server error when try to upload file(%s):%s
",argv[i+2],strerror(errno));
exit(errno);
}
//puts("connect() ok");
strncpy(buff,name,USER_NAME_LEN);
strncpy(buff+USER_NAME_LEN,passwd,USER_PASSWD_LEN);
char *filename=NULL;
if((filename=(char *)malloc(FILENAME_SIZE))==NULL)
sys_exit_msg("Memory tight ? :");
/*filename should be null terminated!!*/
if(path2filename(argv[i+2],filename,FILENAME_SIZE)<0){
fprintf(stderr,"the %dth file path(%s) incorrect! Ignore this one!
",i+1,argv[i+2]);
/* do NOT free buff!! JUST CLOSE sockfd!*/
close(sockfd); free(filename);
continue;
}
strncpy(buff+USER_NAME_LEN+USER_PASSWD_LEN,filename,strlen(filename)+1);
// printf("user=%s,passwd=%s
",buff,buff+USER_NAME_LEN);
//fprintf(stdout,"valid filename =%s
",filename);
if(writen(sockfd,buff,300)<0){
fprintf(stderr,"the %dth file path(%s) Uploading canceled!
",i+1,argv[i+2]);
close(sockfd);free(filename);
continue;
}
if(readn(sockfd,buff,300)<0){
fprintf(stderr,"the %dth file path(%s) Uploading canceled!
",i+1,argv[i+2]);
close(sockfd);free(filename);
continue;
}
//puts("HERE 01");
if(buff[0]!='0'){
fprintf(stdout,"%s",buff+1);
fprintf(stderr,"the %dth file path(%s) Uploading canceled!
",i+1,argv[i+2]);
close(sockfd);free(filename);
continue;
}
int fd;
if((fd=open(argv[i+2],O_RDONLY))<0)
sys_exit_msg("File open error: ");
stat(argv[i+2],&fs);
/*查看文件的信息,包括文件类型,文件大小(因为文件上传有每个文件限制大小,但是如果硬要上传,也可以。服务器端会保证不超限)*/
if(!S_ISREG(fs.st_mode) || fs.st_size> MAX_UPLOADFILE_SIZE){
fprintf(stdout,"File %s: not regular file or too large. Upload any way(yes or no)?",argv[i+2]);
char answer[4];
if(fgets(answer,4,stdin)==NULL || strncmp(answer,"yes",4)){
fprintf(stderr,"the %dth file (path=%s) Uploading canceled!
",i+1,argv[i+2]);
close(sockfd);free(filename);close(fd);
continue;
}
}
fprintf(stdout,"file %s is opened
",argv[i+2]);
int re=0,wt=0;
while((re=readn(fd,buff,50))>0){
if((wt+re)>MAX_UPLOADFILE_SIZE)
break;
if(writen(sockfd,buff,re)<0){
fprintf(stderr,"the %dth file (path=%s) Uploading canceled!Some data may have Uploaded!
",i+1,argv[i+2]);
close(sockfd);free(filename);
continue;
}
wt+=re;
}
if(re<0){
sys_exit_msg("read file error");
}
fprintf(stdout,"file %s read done! read %d bytes
",argv[i+2],wt);
close(fd);
close(sockfd);
}
free(buff);
return 0;
}
/*这个函数的代码可能写的不是很好,我单独调这个函数花了1个小时(脑子疲劳)*/
int path2filename(const char *path,char *filename,int len){
int pathlen=strlen(path);
int i=0;
int j=-1;
while(i<pathlen){
if(path[i]==PATH_ESCAPE)
j=i+1;
i++;
}
if(j==pathlen)
return -1;
if(j==-1){
i=(pathlen>(len-1))?(len-1):pathlen;
strncpy(filename,path,i);
}
else{
i=((pathlen-j)>(len-1) )?(len-1):(pathlen-j);
strncpy(filename,path+j,i);
}
filename[i]=' ';
return 1;
}
/**************************************************
CLI02。C
**************************************************/
#include "unisock.h"
int main(int argc,char* argv[]){
int sockfd;
struct sockaddr_in servaddr;
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family=AF_INET;
servaddr.sin_port=htons(DOWNLOAD_FILE_PORT);
if(argc<=2)
sys_exit_msg("Usage:command servaddr file...");
if(inet_pton(AF_INET,argv[1],&servaddr.sin_addr)<0)
sys_exit_msg("check server address:");
char *buff=NULL;
if((buff=(char *)malloc(BUFFSIZE))==NULL)
sys_exit_msg("Memory tight ? :");
char name[USER_NAME_LEN];
char passwd[USER_PASSWD_LEN];
fprintf(stdout,"USER NAME:");fflush(stdout);
if(fgets(name,USER_NAME_LEN,stdin)==NULL)
sys_exit_msg("USER NAME INCORRECT");
name[strlen(name)-1]=' ';
fprintf(stdout,"PASSWD:");fflush(stdout);
fgets(passwd,USER_PASSWD_LEN,stdin);
passwd[strlen(passwd)-1]=' ';
int i=0;/*解释类似CLI01*/
for(;i<(argc-2);i++){
if((sockfd=socket(AF_INET,SOCK_STREAM,0))<0){
fprintf(stderr,"Socket Error in uploading the %dth file:%s
",i+1,strerror(errno));
free(buff); /*no need to free,we exit next*/
exit(errno);
}
fprintf(stdout,"sockfd=%d
",sockfd);
if(connect(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr))<0){
fprintf(stderr,"Connection to the server error when the %dth file:%s
",i+1,strerror(errno));
exit(errno);
}
int fd;
if((fd=open(argv[i+2],O_CREAT|O_EXCL|O_RDWR,S_IRUSR|S_IWUSR))<0){
fprintf(stderr,"The file error:%s
",strerror(errno));
continue;
}
strncpy(buff,name,USER_NAME_LEN);
strncpy(buff+USER_NAME_LEN,passwd,USER_PASSWD_LEN);
strncpy(buff+USER_NAME_LEN+USER_PASSWD_LEN,argv[i+2],FILENAME_SIZE);
// fprintf(stderr,"name=%s,passwd=%s
",buff,buff+USER_NAME_LEN);
if(writen(sockfd,buff,300)<0){
fprintf(stderr,"the %dth file path(%s) Uploading canceled!
",i+1,argv[i+2]);
close(sockfd);close(fd);
continue;
}
// puts("HERE 01");
if(readn(sockfd,buff,300)<0){
fprintf(stderr,"the %dth file path(%s) Uploading canceled!
",i+1,argv[i+2]);
close(sockfd);close(fd);
continue;
}
//puts("HERE 02");
if(buff[0]!='0'){
fprintf(stdout,"%s",buff+1);
fprintf(stderr,"the %dth file path(%s) Uploading canceled!
",i+1,argv[i+2]);
close(sockfd);close(fd);
continue;
}
int re=0,wt=0;
while((re=readn(sockfd,buff,BUFFSIZE))>0){
if(writen(fd,buff,re)<0)
break;
wt+=re;
}
fprintf(stdout,"File %s downloading over. File size %d
",argv[i+2],wt);
close(sockfd);
close(fd);
}//end for
return 0;
}
|
|
|
----
学生一个!
|
|
[Original]
[Print]
[Top]
|
|
[Original]
[Print]
[Top]
|
UNISOCK。H是一个通用的头文件,我并没有进行分离,
SERV01。C是上传文件的服务,CLI01。C是上传的客户端
SERV02。C是下载文件的服务,CLI02。C是下载的客户端
|
|
|
----
学生一个!
|
|
[Original]
[Print]
[Top]
|
|
[Original]
[Print]
[Top]
|
我昨天晚上突然想到一个不好的地方,就是SERV02.C中的代码
alarm(CHECK_DOWNLOAD_SERVER_TIME);
//check the download server is busy or not
while(1);//never stop!
中的while(1)语句,应该改为pause();比较好,这样主线程就不会得到执行(虽然是空),而是仅在闹钟是醒
而相应的闹钟处理函数,在重新设置完后,也要pause(),防止主线程返回后退出执行.
|
|
|
----
学生一个!
|
|
[Original]
[Print]
[Top]
|
|