温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

PHP编程中尝试程序并发的方式有哪些

发布时间:2021-08-31 13:55:42 阅读:147 作者:小新 栏目:开发技术
PHP开发者专用服务器限时活动,0元免费领,库存有限,领完即止! 点击查看>>

这篇文章将为大家详细讲解有关PHP编程中尝试程序并发的方式有哪些,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。


1.curl_multi_init
文档中说的是 Allows the processing of multiple cURL handles asynchronously. 确实是异步。这里需要理解的是select这个方法,文档中是这么解释的Blocks until there is activity on any of the curl_multi connections.。了解一下常见的异步模型就应该能理解,select, epoll,都很有名

<?php
// build the individual requests as above, but do not execute them
$ch_1 curl_init('https://www.jb51.net/');
$ch_2 curl_init('https://www.jb51.net/');
curl_setopt($ch_1, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch_2, CURLOPT_RETURNTRANSFER, true);

// build the multi-curl handle, adding both $ch
$mh curl_multi_init();
curl_multi_add_handle($mh$ch_1);
curl_multi_add_handle($mh$ch_2);

// execute all queries simultaneously, and continue when all are complete
$running null;
do {
  curl_multi_exec($mh$running);
  $ch curl_multi_select($mh);
  if($ch !== 0){
    $info curl_multi_info_read($mh);
    if($info){
      var_dump($info);
      $response_1 curl_multi_getcontent($info['handle']);
      echo "$response_1 \n";
      break;
    }
  }
} while ($running 0);

//close the handles
curl_multi_remove_handle($mh$ch_1);
curl_multi_remove_handle($mh$ch_2);
curl_multi_close($mh);

这里我设置的是,select得到结果,就退出循环,并且删除 curl resource, 从而达到取消http请求的目的。

2.swoole_client
swoole_client提供了异步模式,我竟然把这个忘了。这里的sleep方法需要swoole版本大于等于1.7.21, 我还没升到这个版本,所以直接exit也可以。

<?php
$client new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
//设置事件回调函数
$client->on("connect", function($cli) {
  $req "GET / HTTP/1.1\r\n
  Host: www.jb51.net\r\n
  Connection: keep-alive\r\n
  Cache-Control: no-cache\r\n
  Pragma: no-cache\r\n\r\n";

  for ($i=0$i 3$i++) {
    $cli->send($req);
  }
});
$client->on("receive", function($cli$data){
  echo "Received: ".$data."\n";
  exit(0);
  $cli->sleep(); // swoole >= 1.7.21
});
$client->on("error", function($cli){
  echo "Connect failed\n";
});
$client->on("close", function($cli){
  echo "Connection close\n";
});
//发起网络连接
$client->connect('183.207.95.145'801);

3.process
哎,竟然差点忘了 swoole_process, 这里就不用 pcntl 模块了。但是写完发现,这其实也不算是中断请求,而是哪个先到读哪个,忽视后面的返回值。

<?php

$workers = [];
$worker_num 3;//创建的进程数
$finished false;
$lock new swoole_lock(SWOOLE_MUTEX);

for($i=0;$i<$worker_num $i++){
  $process new swoole_process('process');
  //$process->useQueue();
  $pid $process->start();
  $workers[$pid] = $process;
}

foreach($workers as $pid => $process){
  //子进程也会包含此事件
  swoole_event_add($process->pipe, function ($pipeuse($process$lock, &$finished) {
    $lock->lock();
    if(!$finished){
      $finished true;
      $data $process->read();
      echo "RECV: " . $data.PHP_EOL;
    }
    $lock->unlock();
  });
}

function process(swoole_process $process){
  $response 'http response';
  $process->write($response);
  echo $process->pid,"\t",$process->callback .PHP_EOL;
}

for($i 0$i $worker_num$i++) {
  $ret = swoole_process::wait();
  $pid $ret['pid'];
  echo "Worker Exit, PID=".$pid.PHP_EOL;
}

4.pthreads
编译pthreads模块时,提示php编译时必须打开ZTS, 所以貌似必须 thread safe 版本才能使用. wamp中多php正好是TS的,直接下了个dll, 文档中的说明复制到对应目录,就在win下测试了。 还没完全理解,查到文章说 php 的 pthreads 和 POSIX pthreads是完全不一样的。代码有些烂,还需要多看看文档,体会一下。

<?php
class Foo extends Stackable {
  public $url;
  public $response null;
  public function __construct(){
    $this->url = 'https://www.jb51.net';
  }
  public function run(){}
}

class Process extends Worker {
  private $text "";
  public function __construct($text,$object){
    $this->text = $text;
    $this->object = $object;
  }
  public function run(){
    while (is_null($this->object->response)){
      print " Thread {$this->text} is running\n";
      $this->object->response = 'http response';
      sleep(1);
    }
  }
}

$foo new Foo();

$a new Process("A",$foo);
$a->start();

$b new Process("B",$foo);
$b->start();

echo $foo->response;

5.yield
以同步方式书写异步代码:

<?php 
 
class AsyncServer { 
  protected $handler; 
  protected $socket; 
  protected $tasks = []; 
  protected $timers = []; 
 
  public function __construct(callable $handler) { 
    $this->handler = $handler; 
 
    $this->socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); 
    if(!$this->socket) { 
      die(socket_strerror(socket_last_error())."\n"); 
    } 
    if (!socket_set_nonblock($this->socket)) { 
      die(socket_strerror(socket_last_error())."\n"); 
    } 
    if(!socket_bind($this->socket, "0.0.0.0"1234)) { 
      die(socket_strerror(socket_last_error())."\n"); 
    } 
  } 
 
  public function Run() { 
    while (true) { 
      $now microtime(true) * 1000; 
      foreach ($this->timers as $time => $sockets) { 
        if ($time $nowbreak; 
        foreach ($sockets as $one) { 
          list($socket$coroutine) = $this->tasks[$one]; 
          unset($this->tasks[$one]); 
          socket_close($socket); 
          $coroutine->throw(new Exception("Timeout")); 
        } 
        unset($this->timers[$time]); 
      } 
 
      $reads array($this->socket); 
      foreach ($this->tasks as list($socket)) { 
        $reads[] = $socket; 
      } 
      $writes NULL; 
      $exceptsNULL; 
      if (!socket_select($reads$writes$excepts01000)) { 
        continue; 
      } 
 
      foreach ($reads as $one) { 
        $len socket_recvfrom($one$data655350$ip$port); 
        if (!$len) { 
          //echo "socket_recvfrom fail.\n"; 
          continue; 
        } 
        if ($one == $this->socket) { 
          //echo "[Run]request recvfrom succ. data=$data ip=$ip port=$port\n"; 
          $handler $this->handler; 
          $coroutine $handler($one$data$len$ip$port); 
          if (!$coroutine) { 
            //echo "[Run]everything is done.\n"; 
            continue; 
          } 
          $task $coroutine->current(); 
          //echo "[Run]AsyncTask recv. data=$task->data ip=$task->ip port=$task->port timeout=$task->timeout\n"; 
          $socket socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); 
          if(!$socket) { 
            //echo socket_strerror(socket_last_error())."\n"; 
            $coroutine->throw(new Exception(socket_strerror(socket_last_error()), socket_last_error())); 
            continue; 
          } 
          if (!socket_set_nonblock($socket)) { 
            //echo socket_strerror(socket_last_error())."\n"; 
            $coroutine->throw(new Exception(socket_strerror(socket_last_error()), socket_last_error())); 
            continue; 
          } 
          socket_sendto($socket$task->data, $task->len, 0$task->ip, $task->port); 
          $deadline $now $task->timeout; 
          $this->tasks[$socket] = [$socket$coroutine$deadline]; 
          $this->timers[$deadline][$socket] = $socket; 
        } else { 
          //echo "[Run]response recvfrom succ. data=$data ip=$ip port=$port\n"; 
          list($socket$coroutine$deadline) = $this->tasks[$one]; 
          unset($this->tasks[$one]); 
          unset($this->timers[$deadline][$one]); 
          socket_close($socket); 
          $coroutine->send(array($data$len)); 
        } 
      } 
    } 
  } 
} 
 
class AsyncTask { 
  public $data; 
  public $len; 
  public $ip; 
  public $port; 
  public $timeout; 
 
  public function __construct($data$len$ip$port$timeout) { 
    $this->data = $data; 
    $this->len = $len; 
    $this->ip = $ip; 
    $this->port = $port; 
    $this->timeout = $timeout; 
  } 
} 
 
function AsyncSendRecv($req_buf$req_len$ip$port$timeout{ 
  return new AsyncTask($req_buf$req_len$ip$port$timeout); 
} 
 
function RequestHandler($socket$req_buf$req_len$ip$port{ 
  //echo "[RequestHandler] before yield AsyncTask. REQ=$req_buf\n"; 
  try { 
    list($rsp_buf$rsp_len) = (yield AsyncSendRecv($req_buf$req_len"127.0.0.1"23453000)); 
  } catch (Exception $ex) { 
    $rsp_buf $ex->getMessage(); 
    $rsp_len strlen($rsp_buf); 
    //echo "[Exception]$rsp_buf\n"; 
  } 
  //echo "[RequestHandler] after yield AsyncTask. RSP=$rsp_buf\n"; 
  socket_sendto($socket$rsp_buf$rsp_len0$ip$port); 
} 
 
$server new AsyncServer(RequestHandler); 
$server->Run(); 
 
?>

代码解读:

借助PHP内置array能力,实现简单的“超时管理”,以毫秒为精度作为时间分片;
封装AsyncSendRecv接口,调用形如yield AsyncSendRecv(),更加自然;
添加Exception作为错误处理机制,添加ret_code亦可,仅为展示之用。

关于“PHP编程中尝试程序并发的方式有哪些”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

php
AI

开发者交流群×