let host = env::args().nth(1).unwrap_or_else(|| DFLT_BROKER.to_string() );
// Define the set of options for the create. // Use an ID for a persistent session. let create_opts = mqtt::CreateOptionsBuilder::new() .server_uri(host) .client_id(DFLT_CLIENT.to_string()) .finalize();
// Create a client. let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| { println!("Error creating the client: {:?}", err); process::exit(1); });
// Define the set of options for the connection. let conn_opts = mqtt::ConnectOptionsBuilder::new() .keep_alive_interval(Duration::from_secs(20)) .clean_session(true) .finalize();
// Connect and wait for it to complete or fail. if let Err(e) = cli.connect(conn_opts) { println!("Unable to connect:\n\t{:?}", e); process::exit(1); }
// Define the set of options for the create. // Use an ID for a persistent session. letcreate_opts = mqtt::CreateOptionsBuilder::new() .server_uri(host) .client_id(DFLT_CLIENT.to_string()) .finalize();
// Create a client. letcli = mqtt::Client::new(create_opts).unwrap_or_else(|err| { println!("Error creating the client: {:?}", err); process::exit(1); });
// Define the set of options for the connection. letconn_opts = mqtt::ConnectOptionsBuilder::new() .keep_alive_interval(Duration::from_secs(20)) .clean_session(true) .finalize();
// Connect and wait for it to complete or fail. ifletErr(e) = cli.connect(conn_opts) { println!("Unable to connect:\n\t{:?}", e); process::exit(1); }
// Create a message and publish it. // Publish message to 'test' and 'hello' topics. fornumin0..5 { letcontent = "Hello world! ".to_string() + &num.to_string(); letmut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS); if num % 2 == 0 { println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]); msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS); } else { println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]); } lettok = cli.publish(msg);
use std::{ env, process, thread, time::Duration };
externcrate paho_mqtt as mqtt;
const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883"; const DFLT_CLIENT:&str = "rust_subscribe"; const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"]; // The qos list that match topics above. const DFLT_QOS:&[i32] = &[0, 1];
// Reconnect to the broker when connection is lost. fntry_reconnect(cli: &mqtt::Client) ->bool { println!("Connection lost. Waiting to retry connection"); for_in0..12 { thread::sleep(Duration::from_millis(5000)); if cli.reconnect().is_ok() { println!("Successfully reconnected"); returntrue; } } println!("Unable to reconnect after several attempts."); false }
// Define the set of options for the create. // Use an ID for a persistent session. letcreate_opts = mqtt::CreateOptionsBuilder::new() .server_uri(host) .client_id(DFLT_CLIENT.to_string()) .finalize();
// Create a client. letmut cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| { println!("Error creating the client: {:?}", err); process::exit(1); });
// Initialize the consumer before connecting. letrx = cli.start_consuming();
// Define the set of options for the connection. letlwt = mqtt::MessageBuilder::new() .topic("test") .payload("Consumer lost connection") .finalize(); letconn_opts = mqtt::ConnectOptionsBuilder::new() .keep_alive_interval(Duration::from_secs(20)) .clean_session(false) .will_message(lwt) .finalize();
// Connect and wait for it to complete or fail. ifletErr(e) = cli.connect(conn_opts) { println!("Unable to connect:\n\t{:?}", e); process::exit(1); }
// If still connected, then disconnect now. if cli.is_connected() { println!("Disconnecting"); cli.unsubscribe_many(DFLT_TOPICS).unwrap(); cli.disconnect(None).unwrap(); } println!("Exiting"); }