src/Services/libMQTT/src/ClientMQTT.php line 249

Open in your IDE?
  1. <?PHP
  2. /**
  3.  * MQTT 3.1.1 library for PHP with TLS support
  4.  *
  5.  * @author Pekka Harjamäki <mcfizh@gmail.com>
  6.  * @license MIT
  7.  * @version 1.0.3
  8.  */
  9. //namespace LibMQTT;
  10. namespace  App\Services\libMQTT\src;
  11. /**
  12.  * Client class for MQTT
  13.  */
  14. class ClientMQTT {
  15.     /** @var int $timeSincePingReq      When the last PINGREQ was sent */
  16.     public  $timeSincePingReq;
  17.     /** @var int $timeSincePingResp     When the last PINGRESP was received */
  18.     public  $timeSincePingResp;
  19.     /** @var boolean $debug         Debug messages enabled? */
  20.     private $debug;
  21.     /** @var int $packet            ID of the next free packet */
  22.     private $packet;
  23.     /** @var array $topics          Array of topics we're subscribed to */
  24.     private $topics;
  25.     /** @var string $connMethod     Method used for connection */
  26.     private $connMethod;
  27.     /** @var resource $socket       Socket .. well.. socket */
  28.     private $socket;
  29.     /** @var string $serverAddress      Hostname of the server */
  30.     private $serverAddress;
  31.     /** @var string $serverPort     Port on the server */
  32.     private $serverPort;
  33.     /** @var string $clientID       ClientID for connection */
  34.     private $clientID;
  35.     /** @var string $caFile         CA file for server authentication */
  36.     private $caFile;
  37.     /** @var bool $verifyPeerName       Verify Certificate peer name */
  38.     private $verifyPeerName;
  39.     /** @var string $clientCrt      Certificate file for client authentication */
  40.     private $clientCrt;
  41.     /** @var string $clientKey      Key file for client authentication */
  42.     private $clientKey;
  43.     /** @var string $authUser       Username for authentication */
  44.     private $authUser;
  45.     /** @var string $authPass       Password for authentication */
  46.     private $authPass;
  47.     /** @var int $keepAlive         Link keepalive time */
  48.     private $keepAlive;
  49.     /** @var array $messageQueue    Messages published with QoS 1 are placed here, until they are confirmed */
  50.     private $messageQueue;
  51.     /**
  52.      * Class constructor
  53.      *
  54.      * @param string $address Address of the broker
  55.      * @param string $port Port on the broker
  56.      * @param string $clientID clientID for the broker
  57.      */
  58.     function __construct($address$port$clientID)
  59.     {
  60.         $this->connMethod "tcp";
  61.         $this->socket null;
  62.         $this->serverAddress null;
  63.         $this->serverPort null;
  64.         $this->clientID null;
  65.         $this->authUser null;
  66.         $this->authPass null;
  67.         $this->keepAlive 15;
  68.         $this->packet 1;
  69.         $this->topics = [];
  70.         $this->messageQueue = [];
  71.         $this->verifyPeerName true;
  72.         // Basic validation of clientid
  73.         if( preg_match("/[^0-9a-zA-Z]/",$clientID) ) {
  74.             error_log("ClientId can only contain characters 0-9,a-z,A-Z");
  75.             return;
  76.         }
  77.         if( strlen($clientID) > 23 ) {
  78.             error_log("ClientId max length is 23 characters/numbers");
  79.             return;
  80.         }
  81.         //
  82.         $this->serverAddress $address;
  83.         $this->serverPort $port;
  84.         $this->clientID $clientID;
  85.     }
  86.     /**
  87.      * Class destructor
  88.      */
  89.     function __destruct()
  90.     {
  91.         if($this->socket)
  92.             $this->close();
  93.     }
  94.     /**
  95.      * Try to connect to broker
  96.      *
  97.      * @param boolean $clean Is this connection clean?
  98.      *
  99.      * @return boolean Returns false if connection failed
  100.      */
  101.     function connect($clean true)
  102.     {
  103.         // Don't do anything, if server address is not set
  104.         if(!$this->serverAddress) {
  105.             return false;
  106.         }
  107.         // Is encryption enabled?
  108.         if($this->connMethod!="tcp") {
  109.             $socketContextOptions = [ "ssl" => [] ];
  110.             $socketContextOptions["ssl"]["verify_peer_name"] = $this->verifyPeerName;
  111.             if($this->caFile) {
  112.                 $socketContextOptions["ssl"]["cafile"]=$this->caFile;
  113.             }
  114.             if($this->clientCrt) {
  115.                 $socketContextOptions["ssl"]["local_cert"]=$this->clientCrt;
  116.             }
  117.             if ($this->clientCrt && $this->clientKey) {
  118.                 $socketContextOptions["ssl"]["local_pk"]=$this->clientKey;
  119.             }
  120.             $socketContext stream_context_create($socketContextOptions);
  121.             $host $this->connMethod "://" $this->serverAddress ":" $this->serverPort;
  122.             $this->socket stream_socket_client(
  123.                 $host$errno$errstr60STREAM_CLIENT_CONNECT$socketContext);
  124.             $this->debugMessage("Connecting to: ".$host);
  125.         } else {
  126.             $host $this->connMethod "://" $this->serverAddress ":" $this->serverPort;
  127.             $this->socket stream_socket_client(
  128.                 $host$errno$errstr60STREAM_CLIENT_CONNECT );
  129.             $this->debugMessage("Connecting to: ".$host);
  130.         }
  131.         //
  132.         if(!$this->socket) {
  133.             $this->socket null;
  134.             $this->debugMessage("Connection failed.. $errno , $errstr");
  135.             return false;
  136.         }
  137.         //
  138.         stream_set_timeout($this->socket10);
  139.         stream_set_blocking($this->socketfalse);
  140.         $bytes 0;
  141.         $buffer "";
  142.         // ------------------------------------
  143.         // Calculate connect flags
  144.         $var $clean?2:0;
  145. /*
  146.         if($this->will != NULL)
  147.         {
  148.             $var += 4;                              // Set will flag
  149.             $var += ($this->will['qos'] << 3);      // Set will qos
  150.             if($this->will['retain'])
  151.                 $var += 32;                     // Set will retain
  152.         }
  153. */
  154.         if($this->authPass != NULL) {
  155.             $var += 64;
  156.         }
  157.         if($this->authUser != NULL) {
  158.             $var += 128;
  159.         }
  160.         // ------------------------------------
  161.         // Create CONNECT packet (for MQTT 3.1.1 protocol)
  162.         $buffer .= $this->convertString("MQTT"$bytes);
  163.         $buffer .= chr(0x04);   // Protocol level
  164.         $bytes++;
  165.         $buffer .= chr($var);   // Connect flags
  166.         $bytes++;
  167.         $buffer .= chr($this->keepAlive >> 8);  // Keepalive (MSB)
  168.         $bytes++;
  169.         $buffer .= chr($this->keepAlive 0xff);    // Keepalive (LSB)
  170.         $bytes++;
  171.         $buffer .= $this->convertString($this->clientID,$bytes);
  172.         //Adding will to payload
  173.         /*
  174.         if($this->will != NULL){
  175.             $buffer .= $this->strwritestring($this->will['topic'],$bytes);
  176.             $buffer .= $this->strwritestring($this->will['content'],$bytes);
  177.         }
  178.         */
  179.         if($this->authUser) {
  180.             $buffer .= $this->convertString($this->authUser,$bytes);
  181.         }
  182.         if($this->authPass) {
  183.             $buffer .= $this->convertString($this->authPass,$bytes);
  184.         }
  185.         $header $this->createHeader0x10 $bytes );
  186.         fwrite($this->socket$headerstrlen($header));
  187.         fwrite($this->socket$buffer);
  188.         // Wait for CONNACK packet
  189.         $string $this->readBytes(4false);
  190.         if(strlen($string)!=4) {
  191.             $this->debugMessage("Connection failed! Server gave unexpected response (".strlen($string)." bytes).");
  192.             return false;
  193.         }
  194.         if(ord($string{0}) == 0x20 && $string{3} == chr(0)) {
  195.             $this->debugMessage("Connected to MQTT");
  196.         } else {
  197.             $msg sprintf("Connection failed! Error: 0x%02x 0x%02x",
  198.                 ord($string{0}),ord($string{3}) );
  199.             $this->debugMessage($msg);
  200.             return false;
  201.         }
  202.         $this->timeSincePingReq time();
  203.         $this->timeSincePingResp time();
  204.         return true;
  205.     }
  206.     /**
  207.      * Sets client crt and key files for client-side authentication
  208.      *
  209.      * @param string $crtFile Client certificate file
  210.      * @param string $keyFile Client key file
  211.      */
  212.     function setClientCert($crtFile$keyFile)
  213.     {
  214.         if(!file_exists($crtFile)) {
  215.             $this->debugMessage("Client certificate not found");
  216.             return;
  217.         }
  218.         if(!file_exists($keyFile)) {
  219.             $this->debugMessage("Client key not found");
  220.             return;
  221.         }
  222.         $this->clientCrt $crtFile;
  223.         $this->clientKey $keyFile;
  224.     }
  225.     /**
  226.      * Sets CAfile which is used to identify server
  227.      *
  228.      * @param string $caFile Client certificate file
  229.      */
  230.     function setCAFile($caFile)
  231.     {
  232.         if(!file_exists($caFile)) {
  233.             $this->debugMessage("CA file not found");
  234.             return;
  235.         }
  236.         $this->caFile $caFile;
  237.     }
  238.     /**
  239.      * Sets authentication details
  240.      *
  241.      * @param string $username Username
  242.      * @param string $password Password
  243.      */
  244.     function setAuthDetails($username$password)
  245.     {
  246.         $this->authUser $username;
  247.         $this->authPass $password;
  248.     }
  249.     //SH 2021.03.24 //
  250.     function setParamServer($server$port$clientID)
  251.     {
  252.         $this->serverAddress $server;
  253.         $this->serverPort $port;
  254.         $this->clientID $clientID;
  255.     }
  256.     //fin SH //
  257.     /**
  258.      * Enables TLS connection and sets crypto protocol
  259.      * Valid values: ssl, tls, tlsv1.0, tlsv1.1, tlsv1.2
  260.      *
  261.      * See this page for more info on values: http://php.net/manual/en/migration56.openssl.php
  262.      * and also this page: https://wiki.php.net/rfc/improved-tls-constants
  263.      *
  264.      * @param string $protocol Set encryption protocol
  265.      */
  266.     function setCryptoProtocol($protocol)
  267.     {
  268.         if(!in_array($protocol, ["ssl","tls","tlsv1.1","tlsv1.2"])) {
  269.             return;
  270.         }
  271.         $this->connMethod $protocol;
  272.     }
  273.     /**
  274.      * Loop to process data packets
  275.      */
  276.     function eventLoop()
  277.     {
  278.         // Socket not connected at all?
  279.         if($this->socket == null) {
  280.             return;
  281.         }
  282.         // Server closed connection?
  283.         if(feof($this->socket)) {
  284.             stream_socket_shutdown($this->socketSTREAM_SHUT_RDWR);
  285.             $this->socket null;
  286.             return;
  287.         }
  288.         // See if there's data waiting?
  289.         $byte $this->readBytes(1true);
  290.         if(strlen($byte) > 0) {
  291.             $cmd ord($byte);
  292.             // Read the length of packet..
  293.             $bytes=0;
  294.             $multiplier=1;
  295.             do {
  296.                 $t_byte ord($this->readBytes(1,true));
  297.                 $bytes+=($t_byte 127)*$multiplier;
  298.                 $multiplier*=128;
  299.                 if($multiplier>128*128*128) {
  300.                     break;
  301.                 }
  302.             } while( ($t_byte&128) != 0);
  303.             //
  304.             $payload "";
  305.             if($bytes>0)
  306.                 $payload $this->readBytes($bytesfalse);
  307.             switch( $cmd 0xf0 ) {
  308.                 case 0xd0:      // PINGRESP
  309.                     $this->debugMessage("Ping response received");
  310.                     break;
  311.                 case 0x30:      // PUBLISH
  312.                     $msg_qos = ( $cmd 0x06 ) >> 1// QoS = bits 1 & 2
  313.                     $this->processMessage$payload$msg_qos );
  314.                     break;
  315.                 case 0x40:      // PUBACK
  316.                     $msg_qos = ( $cmd 0x06 ) >> 1// QoS = bits 1 & 2
  317.                     $this->processPubAck$payload$msg_qos );
  318.                     break;
  319.             }
  320.             $this->timeSincePingReq time();
  321.             $this->timeSincePingResp time();
  322.         }
  323.         if( $this->timeSincePingReq < (time() - $this->keepAlive ) ) {
  324.             $this->debugMessage("Nothing received for a while, pinging..");
  325.             $this->sendPing();
  326.         }
  327.         if( $this->timeSincePingResp < (time() - ($this->keepAlive*2)) ) {
  328.             $this->debugMessage("Not seen a package in a while, reconnecting..");
  329.             stream_socket_shutdown($this->socketSTREAM_SHUT_RDWR);
  330.             $this->socket null;
  331.         }
  332.     }
  333.     /**
  334.      * Subscribe to given MQTT topics
  335.      *
  336.      * @param array $topics Topics to subscribe to
  337.      *
  338.      * @return boolean Did subscribe work or not?
  339.      */
  340.     function subscribe($topics)
  341.     {
  342.         // This will fail, if socket is not connected
  343.         if(!$this->socket) {
  344.             $this->debugMessage("Subscribe failed, because socket is not connected");
  345.             return false;
  346.         }
  347.         //
  348.         $cnt 2;
  349.         // Create payload starting with packet ID
  350.         $payload chr($this->packet >> 8) . chr($this->packet 0xff);
  351.         // If for some reason topic is provided as string, convert it to array
  352.         // If $topics is neither array nor string, refuse to continue
  353.         if( !is_array($topics) && is_string($topics) ) {
  354.             $topics = [ $topics => [ 'qos' => ] ];
  355.         } elseif( !is_array($topics) ) {
  356.             return false;
  357.         }
  358.         //
  359.         $numOfTopics 0;
  360.         foreach($topics as $topic=>$data) {
  361.             // Topic data in wrong format?
  362.             if( !is_array($data) || !isset($data['qos']) ) {
  363.                 continue;
  364.             }
  365.             //
  366.             $payload.=$this->convertString($topic$cnt);
  367.             $payload.=chr($data['qos']); $cnt++;
  368.             $this->topics[$topic]=$data;
  369.             $numOfTopics++;
  370.         }
  371.         // If number of subscribed topics is 0, don't send the request
  372.         if($numOfTopics == 0) {
  373.             return false;
  374.         }
  375.         // Send SUBSCRIBE header & payload
  376.         $header chr(0x82).chr($cnt);
  377.         fwrite($this->socket$header2);
  378.         fwrite($this->socket$payload$cnt);
  379.         // Wait for SUBACK packet
  380.         $resp_head $this->readBytes(2false);
  381.         if( strlen($resp_head) != || ord($resp_head{0}) != 0x90 ) {
  382.             $this->debugMessage("Invalid SUBACK packet received (stage 1)");
  383.             return false;
  384.         }
  385.         // Read remainder of the response
  386.         $bytes ord($resp_head{1});
  387.         $resp_body $this->readBytes($bytesfalse);
  388.         if( strlen($resp_body) < ) {
  389.             $this->debugMessage("Invalid SUBACK packet received (stage 2)");
  390.             return false;
  391.         }
  392.         $package_id = ( ord($resp_body{0}) << ) + ord($resp_body{1});
  393.         if( $this->packet != $package_id ) {
  394.             $this->debugMessage("SUBACK packet received for wrong message");
  395.             return false;
  396.         }
  397.         // FIXME: Process the rest of the SUBACK payload
  398.         //
  399.         $this->packet++;
  400.         return true;
  401.     }
  402.     /**
  403.      * Closes connection to server by first sending DISCONNECT packet, and
  404.      * then closing the stream socket
  405.      */
  406.     function close()
  407.     {
  408.         if(!$this->socket) {
  409.             return;
  410.         }
  411.         $this->sendDisconnect();
  412.         stream_socket_shutdown($this->socketSTREAM_SHUT_RDWR);
  413.         $this->socket null;
  414.     }
  415.     /**
  416.      * Sets verbosity level of messages
  417.      *
  418.      * @param int $level Verbosity level (0: silent , 1: verbose, 2: debug)
  419.      */
  420.     function setVerbose($level)
  421.     {
  422.         if ( $level == || $level == ) {
  423.             $this->debug true;
  424.         }
  425.     }
  426.     /**
  427.      * Gets queue of qos 1 messages that haven't been acknowledged by server
  428.      */
  429.     function getMessageQueue()
  430.     {
  431.         return $this->messageQueue;
  432.     }
  433.     /**
  434.      * Publish message to server
  435.      *
  436.      * @param string $topic Topic to which message is published to
  437.      * @param string $message Message to publish
  438.      * @param int $qos QoS of message (0/1)
  439.      * @param int $retain If set to 1 , server will try to retain the message
  440.      *
  441.      * @return boolean Did publish work or not
  442.      */
  443.     function publish($topic$message$qos$retain 0)
  444.     {
  445.         // Do nothing, if socket isn't connected
  446.         if(!$this->socket) {
  447.             $this->debugMessage("Packet NOT sent, socket not connected! (QoS: ".$qos." ; topic: ".$topic." ; msg: ".$message.")",2);
  448.             return false;
  449.         }
  450.         // Sanity checks for QoS and retain values
  451.         if( ( $qos != && $qos != ) || ($retain != && $retain != ) ) {
  452.             $this->debugMessage("Packet NOT sent, invalid qos/retain value (QoS: ".$qos." ; topic: ".$topic." ; msg: ".$message.")",2);
  453.             return false;
  454.         }
  455.         //
  456.         $bytes 0;
  457.         $payload $this->convertString($topic,$bytes);
  458.         // Add message identifier to QoS (1/2) packages
  459.         if($qos ) {
  460.             $payload .= chr($this->packet >> 8) . chr($this->packet 0xff);
  461.             $bytes+=2;
  462.         }
  463.         // Add Message to package and create header
  464.         $payload .= $message;
  465.         $bytes += strlen($message);
  466.         $header $this->createHeader0x30 + ($qos<<1) + $retain $bytes );
  467.         //
  468.         fwrite($this->socket$headerstrlen($header));
  469.         fwrite($this->socket$payload$bytes);
  470.         // If message QoS = 1 , add message to queue
  471.         if($qos==1) {
  472.             $this->messageQueue$this->packet ] = [
  473.                 "topic" => $topic,
  474.                 "message" => $message,
  475.                 "qos" => $qos,
  476.                 "retain" => $retain,
  477.                 "time" => time(),
  478.                 "attempt" => 1
  479.             ];
  480.         }
  481.         //
  482.         $this->debugMessage("Packet sent (QoS: ".$qos." ; topic: ".$topic." ; bytes: ".$bytes." ; msg: ".$message.")",2);
  483.         //
  484.         $this->packet++;
  485.         return true;
  486.     }
  487.     /**
  488.      * Process puback messages sent by server
  489.      *
  490.      * @param string $msg Message
  491.      * @param int $qos QoS of message
  492.      */
  493.     private function processPubAck ($payload$qos)
  494.     {
  495.         if( strlen($payload) < ) {
  496.             $this->debugMessage("Malformed PUBACK package received");
  497.             return false;
  498.         }
  499.         $package_id = ( ord($payload{0}) << ) + ord($payload{1});
  500.         if( !isset($this->messageQueue[$package_id]) ) {
  501.             $this->debugMessage("Received PUBACK for package we didn't sent?");
  502.             return false;
  503.         }
  504.         unset( $this->messageQueue[$package_id] );
  505.     }
  506.     /**
  507.      * Process publish messages sent by server
  508.      *
  509.      * @param string $msg Message
  510.      * @param int $qos QoS of message
  511.      */
  512.     private function processMessage($msg$qos)
  513.     {
  514.         // Package starts with topic
  515.         $tlen = (ord($msg{0})<<8) + ord($msg{1});
  516.         $msg_topic substr($msg,2,$tlen);
  517.         // QoS 1 and 2 packets also contain identifier
  518.         $msg_id null;
  519.         if($qos == 0) {
  520.             $msg substr($msg,$tlen+2);
  521.         } else {
  522.             $msg_id substr($msg,$tlen+2,2);
  523.             $msg substr($msg,$tlen+4);
  524.         }
  525.         // Then comes the message itself
  526.         $found false;
  527.         foreach($this->topics as $topic=>$data) {
  528.             $t_topic str_replace("+","[^/]*"$topic);
  529.             $t_topic str_replace("/","\/",$t_topic);
  530.             $t_topic str_replace("$","\$",$t_topic);
  531.             $t_topic str_replace("#",".*",$t_topic);
  532.             if(!preg_match("/^".$t_topic."$/"$msg_topic)) {
  533.                 continue;
  534.             }
  535.             $found true;
  536.             $this->debugMessage("Packet received (QoS: ".$qos." ; topic: ".$msg_topic." ; msg: ".$msg.")",2);
  537.             // Is callback for this topic set?
  538.             if(isset($data["function"]) && is_callable($data["function"])) {
  539.                 call_user_func($data["function"], $msg_topic$msg$qos);
  540.             }
  541.         }
  542.         //
  543.         if(!$found) {
  544.             $this->debugMessage("Package received, but it doesn't match subscriptions");
  545.         }
  546.         // QoS 1 package requires PUBACK packet
  547.         if($qos==1) {
  548.             $this->debugMessage("Packet with QoS 1 received, sending PUBACK");
  549.             $payload chr(0x40).chr(0x02).$msg_id;
  550.             fwrite($this->socket$payload4);
  551.         }
  552.         // QoS 2 package requires PUBRECT packet, but we won't give it :)
  553.         if($qos==2) {
  554.             // FIXME
  555.             $this->debugMessage("Packet with QoS 2 received, but feature is not implemented");
  556.         }
  557.     }
  558.     /**
  559.      * Handles debug messages
  560.      *
  561.      * @param string $msg Message
  562.      * @param int $level Logging level
  563.      */
  564.     private function debugMessage($msg$level=1)
  565.     {
  566.         if(!$this->debug || $this->debug $level) {
  567.             return;
  568.         }
  569.         echo "libmqtt: ".$msg."\n";
  570.     }
  571.     /**
  572.      * Create MQTT header, with command and length
  573.      *
  574.      * @param int $cmd Command to send
  575.      * @param int $bytes Number of bytes in the package
  576.      *
  577.      * @return string Header to send
  578.      */
  579.     private function createHeader($cmd$bytes)
  580.     {
  581.         $retval chr($cmd);
  582.         $bytes_left $bytes;
  583.         do {
  584.             $byte $bytes_left 128;
  585.             $bytes_left >>= 7;
  586.             if($bytes_left>0) {
  587.               $byte $byte 0x80;
  588.             }
  589.             $retval.=chr($byte);
  590.         } while($bytes_left>0);
  591.         return $retval;
  592.     }
  593.     /**
  594.      * Writes given string to MQTT string format
  595.      *
  596.      * @param string $data String to convert
  597.      * @param int $cnt Reference to length counter
  598.      *
  599.      * @return string String in MQTT format
  600.      */
  601.     private function convertString($data, &$cnt)
  602.     {
  603.         $len strlen($data);
  604.         $cnt+=$len+2;
  605.         $retval chr($len>>8).chr($len&0xff).$data;
  606.         return $retval;
  607.     }
  608.     /**
  609.      * Read x bytes from socket
  610.      *
  611.      * @param int $bytes Number of bytes to read
  612.      * @param boolean $noBuffer If true, use only direct fread
  613.      */
  614.     private function readBytes($bytes$noBuffer)
  615.     {
  616.         if(!$this->socket) {
  617.             return "";
  618.         }
  619.         if($noBuffer) {
  620.             return fread($this->socket$bytes);
  621.         }
  622.         $bytes_left $bytes;
  623.         $retval "";
  624.         while( !feof($this->socket) && $bytes_left ) {
  625.             $res fread($this->socket$bytes_left);
  626.             $retval.=$res;
  627.             $bytes_left-=strlen($res);
  628.         }
  629.         return $retval;
  630.     }
  631.     /**
  632.      * Sends PINGREQ packet to server
  633.      */
  634.     private function sendPing()
  635.     {
  636.         $this->timeSincePingReq time();
  637.         $payload chr(0xc0).chr(0x00);
  638.         fwrite($this->socket$payload2);
  639.         $this->debugMessage("PING sent");
  640.     }
  641.     /**
  642.      * Sends DISCONNECT packet to server
  643.      */
  644.     private function sendDisconnect()
  645.     {
  646.         if(!$this->socket || feof($this->socket))
  647.             return;
  648.         $payload chr(0xe0).chr(0x00);
  649.         fwrite($this->socket$payload2);
  650.         $this->debugMessage("DISCONNECT sent");
  651.     }
  652. }