温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

LINUX怎么实现多线程进行cp复制

发布时间:2021-11-18 11:04:01 来源:亿速云 阅读:844 作者:小新 栏目:开发技术

这篇文章主要为大家展示了“LINUX怎么实现多线程进行cp复制”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“LINUX怎么实现多线程进行cp复制”这篇文章吧。

关于这个问题,意义虽然有限因为一般来说在复制文件的时候,实际的瓶颈来自于I/O,不管开启多少个线程实际上速度并不会快多少,但是为了练习多线程编程,
这里给出了一种C++代码实现的方式,代码附在最后。

实际上就是将一个文件分割为多个片段,开启多个线程进行同时复制,如果用户制定的并行大于服务器实际的CPU核数,程序会自动降级并行度为CPU核数,如果文件小于
100M则并行度始终为1。

root@bogon:/home/gaopeng/mmm# ./parcp log.log log10.log 2       
set parallel:2
Your cpu core is:4
real parallel:2
Will Create 2 Threads
140677902710528:0:174522367:3:4
140677894317824:174522368:349044736:3:4
Copy Thread:140677902710528 work 25%
Copy Thread:140677894317824 work 25%
Copy Thread:140677902710528 work 50%
Copy Thread:140677902710528 work 75%
Copy Thread:140677902710528 work 100%
Copy Thread:140677902710528 work Ok!!
Copy Thread:140677894317824 work 50%
Copy Thread:140677894317824 work 75%
Copy Thread:140677894317824 work Ok!!

复制完成后进行md5验证
root@bogon:/home/gaopeng/mmm# md5sum log.log
f64acc21f7187a865938b340b3eda198  log.log
root@bogon:/home/gaopeng/mmm# md5sum log10.log
f64acc21f7187a865938b340b3eda198  log10.log
可以看出校验是通过的

代码如下:

点击(此处)折叠或打开

  1. #include<iostream>

  2. #include <map>

  3. #include<stdint.h>

  4. #include<stdio.h>

  5. #include<string.h>

  6. #include<unistd.h>

  7. #include<sys/types.h>

  8. #include<sys/stat.h>

  9. #include <sys/sysinfo.h>

  10. #include<fcntl.h>

  11. #include<errno.h>

  12. #include <time.h>

  13. #include <stdarg.h>

  14. #include <stdlib.h>

  15. #include <pthread.h>

  16. #define MAX_BUFFER 65536

  17. using namespace std;




  18. pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;


  19. class thread_info

  20. {


  21.         private:

  22.                 uint64_t start_pos;

  23.                 uint64_t end_pos;

  24.                 int fdr;

  25.                 int fdw;

  26.         public:

  27.                 pthread_t t_id;

  28.                 static int do_id;


  29.         public:

  30.                 thread_info()

  31.                 {


  32.                         start_pos = 0;

  33.                         end_pos = 0;

  34.                         fdr = 0;

  35.                         fdw = 0;

  36.                         t_id = 0;

  37.                 }

  38.                 void set_start(uint64_t a)

  39.                 {


  40.                         start_pos = a;

  41.                 }

  42.                 void set_end(uint64_t a)

  43.                 {


  44.                         end_pos = a;

  45.                 }

  46.                 void set_fd(int a,int b)

  47.                 {


  48.                         fdr = a;

  49.                         fdw = b;

  50.                 }


  51.                 uint64_t get_start(void)

  52.                 {

  53.                         return start_pos;

  54.                 }

  55.                 uint64_t get_stop(void)

  56.                 {

  57.                         return end_pos;

  58.                 }

  59.                 int get_fdr(void)

  60.                 {

  61.                         return fdr;

  62.                 }

  63.                 int get_fdw(void)

  64.                 {

  65.                         return fdw;

  66.                 }


  67.                 void print(void)

  68.                 {

  69.                         cout<<start_pos<<":"<<end_pos<<":"<<t_id<<endl;

  70.                 }


  71. };


  72. int thread_info::do_id = 0;


  73. class ABS_dispatch

  74. {


  75.         public:

  76.                 ABS_dispatch()

  77.                 {


  78.                         par_thr = 0;

  79.                         max_cpu = 0;

  80.                 }

  81.                 virtual thread_info* call_thread(void) = 0;

  82.                 virtual void  make_init(uint32_t t_n) = 0;

  83.                 uint32_t getcpu(void)

  84.                 {

  85.                         return get_nprocs() ;

  86.                 }

  87.                 virtual ~ABS_dispatch(){

  88.                 }

  89.         protected:

  90.                 uint32_t par_thr;

  91.                 uint32_t max_cpu;

  92. };


  93. class dispatch:public ABS_dispatch

  94. {


  95.         public:

  96.                 typedef multimap<uint64_t,uint64_t>::iterator pair_piece_iterator;

  97.                 dispatch():ABS_dispatch()

  98.         {

  99.                 file_idr = 0;

  100.                 file_idw = 0;

  101.                 file_size = 0;

  102.                 tread_arr_p = NULL;

  103.         }

  104.                 virtual thread_info* call_thread(void);

  105.                 virtual void  make_init(uint32_t t_n)

  106.                 {

  107.                         max_cpu = getcpu(); //

  108.                         cout<<"Your cpu core is:"<<max_cpu<<"\n";

  109.                         if(t_n > max_cpu) //parallel

  110.                         {

  111.                                 cout<<"Parallel downgrad to cpu core:"<<max_cpu<<"\n";

  112.                                 par_thr = max_cpu;

  113.                         }

  114.                         else

  115.                         {

  116.                                 par_thr = t_n;

  117.                         }

  118.                 }

  119.                 void set_init(int file_idr,int file_idw)

  120.                 {


  121.                         file_size = lseek(file_idr,0,SEEK_END);

  122.                         if(file_size<100000000)

  123.                         {

  124.                                 cout<<"File small than 100M par = 1" <<"\n";

  125.                                 par_thr = 1;

  126.                         }

  127.                         this->file_idr = file_idr;

  128.                         this->file_idw = file_idw;

  129.                 }

  130.                 uint32_t real_par()

  131.                 {

  132.                         return par_thr;

  133.                 }

  134.                 virtual ~dispatch()

  135.                 {

  136.                         pair_piece.clear();

  137.                         delete [] tread_arr_p;

  138.                 }

  139.         private:

  140.                 int file_idr;

  141.                 int file_idw;

  142.                 multimap<uint64_t,uint64_t> pair_piece;

  143.                 uint64_t file_size;

  144.         public:

  145.                 thread_info* tread_arr_p;

  146. };





  147. static void* do_work(void* argc)

  148. {

  149.         uint64_t b;

  150.         uint64_t e;

  151.         int fdr;

  152.         int fdw;

  153.         char* buffer[MAX_BUFFER]={0};

  154.         thread_info* tread_arr_p;

  155.         uint64_t loopc = 0;

  156.         uint64_t loopc25 = 0;

  157.         uint64_t i = 0;

  158.         int m = 1;

  159.         pthread_t t_id;

  160.         tread_arr_p = static_cast<thread_info*>(argc);


  161.         //临界区 MUTEX

  162.         pthread_mutex_lock(&counter_mutex);

  163.         b = (tread_arr_p+ tread_arr_p->do_id)->get_start();

  164.         e = (tread_arr_p+ tread_arr_p->do_id)->get_stop();

  165.         fdr = (tread_arr_p+ tread_arr_p->do_id)->get_fdr();

  166.         fdw = (tread_arr_p+ tread_arr_p->do_id)->get_fdw();

  167.         t_id = (tread_arr_p+ tread_arr_p->do_id)->t_id ;

  168.         cout<< t_id <<":"<<b<<":"<<e<<":"<<fdr<<":"<<fdw<<"\n";

  169.         tread_arr_p->do_id++;

  170.         pthread_mutex_unlock(&counter_mutex);

  171.         //临界区

  172.         loopc = e/uint64_t(MAX_BUFFER);

  173.         loopc25 = loopc/(uint64_t)4;


  174.         while(i<loopc)

  175.         {


  176.                 if(i == loopc25*m )

  177.                 {

  178.                         cout<< "Copy Thread:"<<t_id<<" work "<<25*m<<"%\n";

  179.                         m++;

  180.                 }


  181.                 memset(buffer,0,MAX_BUFFER);

  182.                 pread(fdr,buffer,MAX_BUFFER,uint64_t(i*MAX_BUFFER));

  183.                 pwrite(fdw,buffer,MAX_BUFFER,uint64_t(i*MAX_BUFFER));

  184.                 i++;

  185.         }

  186.         memset(buffer,0,MAX_BUFFER);

  187.         pread(fdr,buffer,(e-uint64_t(i*MAX_BUFFER)),uint64_t(i*MAX_BUFFER));

  188.         pwrite(fdw,buffer,(e-uint64_t(i*MAX_BUFFER)),uint64_t(i*MAX_BUFFER));

  189.         cout<< "Copy Thread:"<<t_id<<" work Ok!!"<<"\n";

  190.         return NULL;

  191. }


  192. thread_info* dispatch::call_thread()

  193. {


  194.         int i = 0;

  195.         uint64_t temp_size = 0;

  196.         temp_size = file_size/par_thr;

  197.         tread_arr_p = new thread_info[par_thr];


  198.         cout<<"Will Create "<<par_thr<<" Threads\n";

  199.         //cout<<tread_arr_p<<endl;

  200.         //cout<<sizeof(thread_info)<<endl;

  201.         for(i = 0;i<par_thr-1;i++)

  202.         {

  203.                 pair_piece.insert( pair<uint64_t,uint64_t>(temp_size*i,temp_size*(i+1)-1 ));

  204.         }

  205.         pair_piece.insert( pair<uint64_t,uint64_t>(temp_size*i,file_size ));

  206.         i = 1;

  207.         for(pair_piece_iterator it =pair_piece.begin();it !=pair_piece.end() ;it++)

  208.         {

  209.                 //cout<<"--Thread: "<<i<<"\n";

  210.                 //cout<<it->first<<"\n";

  211.                 //cout<<it->second<<"\n";


  212.                 //cout<<tread_arr_p+(i-1)<<endl;

  213.                 (tread_arr_p+(i-1))->set_start(it->first);

  214.                 (tread_arr_p+(i-1))->set_end(it->second);

  215.                 (tread_arr_p+(i-1))->set_fd(file_idr,file_idw);

  216.                 pthread_create(&((tread_arr_p+(i-1))->t_id),NULL,do_work,static_cast<void*>(tread_arr_p));

  217.                 //(tread_arr_p+(i-1))->print();

  218.                 i++;

  219.         }


  220.         return tread_arr_p;

  221. }




  222. int main(int argc,char** argv)

  223. {


  224.         dispatch test;

  225.         thread_info* thread_info_p = NULL;

  226.         uint32_t real_par;

  227.         void *tret;


  228.         int fdr = open(argv[1],O_RDONLY|O_NOFOLLOW);

  229.         int fdw = open(argv[2],O_RDWR|O_CREAT|O_EXCL,0755);

  230.         cout<<"Author: gaopeng QQ:22389860 Blog:http://blog.itpub.net/7728585\n";

  231.         if(argc<4)

  232.         {

  233.                 cout<<"USAGE:parcp sourcefile destfile paralle\n";

  234.                 return -1;

  235.         }

  236.         if(fdr == -1 || fdw == -1)

  237.         {

  238.                 perror("open readfile:");

  239.                 return -1;

  240.         }

  241.         if(fdw == -1)

  242.         {

  243.                 perror("open wirtefile:");

  244.                 return -1;

  245.         }


  246.         if(sscanf(argv[3],"%u",&real_par) == EOF)

  247.         {

  248.                 perror("sscanf:");

  249.                 return -1;

  250.         }


  251.         cout<<"set parallel:"<<real_par<<"\n";


  252.         //cout<<lseek(fd,0,SEEK_SET) <<endl;

  253.         test.make_init(real_par);

  254.         test.set_init(fdr,fdw);

  255.         real_par = test.real_par();

  256.         cout<<"real parallel:" <<real_par<<"\n";

  257.         thread_info_p = test.call_thread();

  258.         for(int i = 0 ;i<real_par;i++)

  259.         {

  260.                 //cout<<(thread_info_p+i)->t_id<<endl;

  261.                 pthread_join((thread_info_p+i)->t_id,&tret);

  262.                 //cout<<reinterpret_cast<long>(tret)<<endl;

  263.         }

  264.         close(fdw);

  265.         close(fdr);

  266. }

以上是“LINUX怎么实现多线程进行cp复制”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI