How to retrieve messages from Alpakka Mqtt Streaming client?

sbt

val AkkaVersion = "2.6.14"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-mqtt-streaming" % "3.0.4",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
  "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion
)

Maven

<properties>
  <akka.version>2.6.14</akka.version>
  <scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-mqtt-streaming_${scala.binary.version}</artifactId>
    <version>3.0.4</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor-typed_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
</dependencies>

Gradle

def versions = [
  AkkaVersion: "2.6.14",
  ScalaBinary: "2.12"
]
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-mqtt-streaming_${versions.ScalaBinary}:3.0.4"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
  implementation "com.typesafe.akka:akka-actor-typed_${versions.ScalaBinary}:${versions.AkkaVersion}"
}

Dependency tree

com.typesafe.akka    akka-actor-typed_2.12    2.6.14    Apache-2.0
    com.typesafe.akka    akka-actor_2.12    2.6.14    Apache-2.0
        com.typesafe    config    1.4.0    Apache-2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.11    Apache-2.0
        org.scala-lang    scala-library    2.12.11    Apache-2.0
    com.typesafe.akka    akka-slf4j_2.12    2.6.14    Apache-2.0
        com.typesafe.akka    akka-actor_2.12    2.6.14    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
                org.scala-lang    scala-library    2.12.11    Apache-2.0
            org.scala-lang    scala-library    2.12.11    Apache-2.0
        org.scala-lang    scala-library    2.12.11    Apache-2.0
        org.slf4j    slf4j-api    1.7.30
    org.scala-lang    scala-library    2.12.11    Apache-2.0
    org.slf4j    slf4j-api    1.7.30
com.typesafe.akka    akka-stream-typed_2.12    2.6.14    Apache-2.0
    com.typesafe.akka    akka-actor-typed_2.12    2.6.14    Apache-2.0
        com.typesafe.akka    akka-actor_2.12    2.6.14    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
                org.scala-lang    scala-library    2.12.11    Apache-2.0
            org.scala-lang    scala-library    2.12.11    Apache-2.0
        com.typesafe.akka    akka-slf4j_2.12    2.6.14    Apache-2.0
            com.typesafe.akka    akka-actor_2.12    2.6.14    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
                    org.scala-lang    scala-library    2.12.11    Apache-2.0
                org.scala-lang    scala-library    2.12.11    Apache-2.0
            org.scala-lang    scala-library    2.12.11    Apache-2.0
            org.slf4j    slf4j-api    1.7.30
        org.scala-lang    scala-library    2.12.11    Apache-2.0
        org.slf4j    slf4j-api    1.7.30
    com.typesafe.akka    akka-stream_2.12    2.6.14    Apache-2.0
        com.typesafe.akka    akka-actor_2.12    2.6.14    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
                org.scala-lang    scala-library    2.12.11    Apache-2.0
            org.scala-lang    scala-library    2.12.11    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.12    2.6.14    Apache-2.0
        com.typesafe    ssl-config-core_2.12    0.4.2    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-parser-combinators_2.12    1.1.2    Apache-2.0
                org.scala-lang    scala-library    2.12.11    Apache-2.0
            org.scala-lang    scala-library    2.12.11    Apache-2.0
        org.reactivestreams    reactive-streams    1.0.3    CC0
        org.scala-lang    scala-library    2.12.11    Apache-2.0
    org.scala-lang    scala-library    2.12.11    Apache-2.0
com.typesafe.akka    akka-stream_2.12    2.6.14    Apache-2.0
    com.typesafe.akka    akka-actor_2.12    2.6.14    Apache-2.0
        com.typesafe    config    1.4.0    Apache-2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.11    Apache-2.0
        org.scala-lang    scala-library    2.12.11    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.12    2.6.14    Apache-2.0
    com.typesafe    ssl-config-core_2.12    0.4.2    Apache-2.0
        com.typesafe    config    1.4.0    Apache-2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.2    Apache-2.0
            org.scala-lang    scala-library    2.12.11    Apache-2.0
        org.scala-lang    scala-library    2.12.11    Apache-2.0
    org.reactivestreams    reactive-streams    1.0.3    CC0
    org.scala-lang    scala-library    2.12.11    Apache-2.0
org.scala-lang    scala-library    2.12.11    Apache-2.0

Scala

copysourceval settings = MqttSessionSettings()
val session = ActorMqttClientSession(settings)

val connection = Tcp().outgoingConnection("localhost", 1883)

val mqttFlow: Flow[Command[Nothing], Either[MqttCodec.DecodeError, Event[Nothing]], NotUsed] =
  Mqtt
    .clientSessionFlow(session, ByteString("1"))
    .join(connection)

Read more here: Source link