From f5ccc868f9d7daa9082b43eba1b4335347345f88 Mon Sep 17 00:00:00 2001 From: ynn Date: Sun, 24 Feb 2019 17:46:56 +0100 Subject: [PATCH] Initial commit --- .gitignore | 234 ++++++++++++++++++ README.md | 74 ++++++ pom.xml | 49 ++++ run-docker.sh | 0 src/main/java/fr/istic/date/EnvoyerDate.java | 5 + src/main/java/fr/istic/date/RecevoirDate.java | 5 + src/main/java/rabbitmq/tuto1/Recv.java | 28 +++ src/main/java/rabbitmq/tuto1/Send.java | 22 ++ src/main/java/rabbitmq/tuto2/NewTask.java | 27 ++ src/main/java/rabbitmq/tuto2/Worker.java | 50 ++++ src/main/java/rabbitmq/tuto3/EmitLog.java | 26 ++ src/main/java/rabbitmq/tuto3/ReceiveLogs.java | 30 +++ .../java/rabbitmq/tuto4/EmitLogDirect.java | 57 +++++ .../rabbitmq/tuto4/ReceiveLogsDirect.java | 36 +++ .../java/rabbitmq/tuto5/EmitLogTopic.java | 57 +++++ .../java/rabbitmq/tuto5/ReceiveLogsTopic.java | 40 +++ src/main/java/rabbitmq/tuto6/RPCClient.java | 70 ++++++ src/main/java/rabbitmq/tuto6/RPCServer.java | 74 ++++++ 18 files changed, 884 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 pom.xml create mode 100644 run-docker.sh create mode 100644 src/main/java/fr/istic/date/EnvoyerDate.java create mode 100644 src/main/java/fr/istic/date/RecevoirDate.java create mode 100644 src/main/java/rabbitmq/tuto1/Recv.java create mode 100644 src/main/java/rabbitmq/tuto1/Send.java create mode 100644 src/main/java/rabbitmq/tuto2/NewTask.java create mode 100644 src/main/java/rabbitmq/tuto2/Worker.java create mode 100644 src/main/java/rabbitmq/tuto3/EmitLog.java create mode 100644 src/main/java/rabbitmq/tuto3/ReceiveLogs.java create mode 100644 src/main/java/rabbitmq/tuto4/EmitLogDirect.java create mode 100644 src/main/java/rabbitmq/tuto4/ReceiveLogsDirect.java create mode 100644 src/main/java/rabbitmq/tuto5/EmitLogTopic.java create mode 100644 src/main/java/rabbitmq/tuto5/ReceiveLogsTopic.java create mode 100644 src/main/java/rabbitmq/tuto6/RPCClient.java create mode 100644 src/main/java/rabbitmq/tuto6/RPCServer.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a181cc2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,234 @@ + +# Created by https://www.gitignore.io/api/osx,java,linux,maven,eclipse,intellij +# Edit at https://www.gitignore.io/?templates=osx,java,linux,maven,eclipse,intellij + +### Eclipse ### + +.metadata +bin/ +tmp/ +*.tmp +*.bak +*.swp +*~.nib +local.properties +.settings/ +.loadpath +.recommenders + +# External tool builders +.externalToolBuilders/ + +# Locally stored "Eclipse launch configurations" +*.launch + +# PyDev specific (Python IDE for Eclipse) +*.pydevproject + +# CDT-specific (C/C++ Development Tooling) +.cproject + +# CDT- autotools +.autotools + +# Java annotation processor (APT) +.factorypath + +# PDT-specific (PHP Development Tools) +.buildpath + +# sbteclipse plugin +.target + +# Tern plugin +.tern-project + +# TeXlipse plugin +.texlipse + +# STS (Spring Tool Suite) +.springBeans + +# Code Recommenders +.recommenders/ + +# Annotation Processing +.apt_generated/ + +# Scala IDE specific (Scala & Java development for Eclipse) +.cache-main +.scala_dependencies +.worksheet + +### Eclipse Patch ### +# Eclipse Core +.project + +# JDT-specific (Eclipse Java Development Tools) +.classpath + +# Annotation Processing +.apt_generated + +.sts4-cache/ + +### Intellij ### +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and WebStorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/modules.xml +# .idea/*.iml +# .idea/modules + +# CMake +cmake-build-*/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based Rest Client +.idea/httpRequests + +# Android studio 3.1+ serialized cache file +.idea/caches/build_file_checksums.ser + +### Intellij Patch ### +# Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721 + +# *.iml +# modules.xml +# .idea/misc.xml +# *.ipr + +# Sonarlint plugin +.idea/sonarlint + +### Java ### +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +### Linux ### +*~ + +# temporary files which can be created if a process still has a handle open of a deleted file +.fuse_hidden* + +# KDE directory preferences +.directory + +# Linux trash folder which might appear on any partition or disk +.Trash-* + +# .nfs files are created when an open file is removed but is still being accessed +.nfs* + +### Maven ### +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties +.mvn/wrapper/maven-wrapper.jar + +### OSX ### +# General +.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two \r +Icon + +# Thumbnails +._* + +# Files that might appear in the root of a volume +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk + +# End of https://www.gitignore.io/api/osx,java,linux,maven,eclipse,intellij diff --git a/README.md b/README.md new file mode 100644 index 0000000..ab8a62e --- /dev/null +++ b/README.md @@ -0,0 +1,74 @@ +# Squelette du TP + +## Utilisation + +Pour utiliser ce tp vous devez : + +1. le forker via le lien fork ci-dessus. +2. Votre repository doit être **privé**. Vous devez le partager avec votre encadrant de TP. +3. vous obtiendrez un nouveau lien : par exemple git@gitlab.istic.univ-rennes1.fr:monpseudo/.... +4. Dans le **workspace d'eclipse** faites `git clone git@gitlab.istic.univ-rennes1.fr:monpseudo/...` avec l'url précédente. +5. Ouvrez le workspace dans eclipse. +6. Utilisez Import> "Existing Maven Projects". +7. Selectionnez le projet et validez. + +Vous devriez avoir une version du projet dans votre propre compte gitlab. + +Vous pouvez le partager avec votre binome et l'encadrant de TP. + + +## Rapport + +Votre rapport doit être écrit ici en markdown. + +Vous trouverez la syntaxe de markdown ici : https://docs.gitlab.com/ee/user/markdown.html + +Placez vos images dans le répertoire images si nécessaire. + + + +## Faire des diagrammes + +En particulier vous pouvez utiliser [mermaid](https://mermaidjs.github.io/) : + + + +```mermaid +sequenceDiagram + participant Alice + participant Bob + Alice->John: Hello John, how are you? + loop Healthcheck + John->John: Fight against hypochondria + end + Note right of John: Rational thoughts
prevail... + John-->Alice: Great! + John->Bob: How about you? + Bob-->John: Jolly good! +``` + +## Insérer du code + +Insérer du `code` : + +```java +public interface ClientHandler { + public void handle(); +} +``` + +et des résultats : + +```bash +[yo@capybara dkgr]$ nc google.fr 80 +PWET / +HTTP/1.0 400 Bad Request +Content-Type: text/html; charset=UTF-8 +Referrer-Policy: no-referrer +Content-Length: 1555 +Date: Mon, 21 Jan 2019 12:18:02 GMT + + + + +``` diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..e5e1546 --- /dev/null +++ b/pom.xml @@ -0,0 +1,49 @@ + + 4.0.0 + + fr.istic.pr + pr.tp.mom + 0.0.1-SNAPSHOT + jar + + pr.tp.mom + http://maven.apache.org + + + UTF-8 + + + + + + com.rabbitmq + amqp-client + 5.6.0 + + + + + junit + junit + 3.8.1 + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.6.1 + + 1.8 + 1.8 + + + + + + diff --git a/run-docker.sh b/run-docker.sh new file mode 100644 index 0000000..e69de29 diff --git a/src/main/java/fr/istic/date/EnvoyerDate.java b/src/main/java/fr/istic/date/EnvoyerDate.java new file mode 100644 index 0000000..3c9f262 --- /dev/null +++ b/src/main/java/fr/istic/date/EnvoyerDate.java @@ -0,0 +1,5 @@ +package fr.istic.date; + +public class EnvoyerDate { + +} diff --git a/src/main/java/fr/istic/date/RecevoirDate.java b/src/main/java/fr/istic/date/RecevoirDate.java new file mode 100644 index 0000000..fe8e3b1 --- /dev/null +++ b/src/main/java/fr/istic/date/RecevoirDate.java @@ -0,0 +1,5 @@ +package fr.istic.date; + +public class RecevoirDate { + +} diff --git a/src/main/java/rabbitmq/tuto1/Recv.java b/src/main/java/rabbitmq/tuto1/Recv.java new file mode 100644 index 0000000..74f7b0f --- /dev/null +++ b/src/main/java/rabbitmq/tuto1/Recv.java @@ -0,0 +1,28 @@ +package rabbitmq.tuto1; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; + +public class Recv { + + private final static String QUEUE_NAME = "hello"; + + public static void main(String[] argv) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + Connection connection = factory.newConnection(); + Channel channel = connection.createChannel(); + + channel.queueDeclare(QUEUE_NAME, false, false, false, null); + System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); + + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + String message = new String(delivery.getBody(), "UTF-8"); + System.out.println(" [x] Received '" + message + "'"); + }; + channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { + }); + } +} diff --git a/src/main/java/rabbitmq/tuto1/Send.java b/src/main/java/rabbitmq/tuto1/Send.java new file mode 100644 index 0000000..e5ac3fa --- /dev/null +++ b/src/main/java/rabbitmq/tuto1/Send.java @@ -0,0 +1,22 @@ +package rabbitmq.tuto1; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +public class Send { + + private final static String QUEUE_NAME = "hello"; + + public static void main(String[] argv) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel()) { + channel.queueDeclare(QUEUE_NAME, false, false, false, null); + String message = "Hello World!"; + channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); + System.out.println(" [x] Sent '" + message + "'"); + } + } +} diff --git a/src/main/java/rabbitmq/tuto2/NewTask.java b/src/main/java/rabbitmq/tuto2/NewTask.java new file mode 100644 index 0000000..c4f4dec --- /dev/null +++ b/src/main/java/rabbitmq/tuto2/NewTask.java @@ -0,0 +1,27 @@ +package rabbitmq.tuto2; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.MessageProperties; + +public class NewTask { + + private static final String TASK_QUEUE_NAME = "task_queue"; + + public static void main(String[] argv) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel()) { + channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); + + String message = String.join(" ", argv); + + channel.basicPublish("", TASK_QUEUE_NAME, + MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); + System.out.println(" [x] Sent '" + message + "'"); + } + } + +} diff --git a/src/main/java/rabbitmq/tuto2/Worker.java b/src/main/java/rabbitmq/tuto2/Worker.java new file mode 100644 index 0000000..797f8c9 --- /dev/null +++ b/src/main/java/rabbitmq/tuto2/Worker.java @@ -0,0 +1,50 @@ +package rabbitmq.tuto2; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; + +public class Worker { + + private static final String TASK_QUEUE_NAME = "task_queue"; + + public static void main(String[] argv) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + final Connection connection = factory.newConnection(); + final Channel channel = connection.createChannel(); + + channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); + System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); + + channel.basicQos(1); + + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + String message = new String(delivery.getBody(), "UTF-8"); + + System.out.println(" [x] Received '" + message + "'"); + try { + doWork(message); + } finally { + System.out.println(" [x] Done"); + channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); + } + }; + channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, + consumerTag -> { + }); + } + + private static void doWork(String task) { + for (char ch : task.toCharArray()) { + if (ch == '.') { + try { + Thread.sleep(1000); + } catch (InterruptedException _ignored) { + Thread.currentThread().interrupt(); + } + } + } + } +} diff --git a/src/main/java/rabbitmq/tuto3/EmitLog.java b/src/main/java/rabbitmq/tuto3/EmitLog.java new file mode 100644 index 0000000..35e615e --- /dev/null +++ b/src/main/java/rabbitmq/tuto3/EmitLog.java @@ -0,0 +1,26 @@ +package rabbitmq.tuto3; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +public class EmitLog { + + private static final String EXCHANGE_NAME = "logs"; + + public static void main(String[] argv) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel()) { + channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); + + String message = argv.length < 1 ? "info: Hello World!" + : String.join(" ", argv); + + channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); + System.out.println(" [x] Sent '" + message + "'"); + } + } + +} diff --git a/src/main/java/rabbitmq/tuto3/ReceiveLogs.java b/src/main/java/rabbitmq/tuto3/ReceiveLogs.java new file mode 100644 index 0000000..d420f23 --- /dev/null +++ b/src/main/java/rabbitmq/tuto3/ReceiveLogs.java @@ -0,0 +1,30 @@ +package rabbitmq.tuto3; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; + +public class ReceiveLogs { + private static final String EXCHANGE_NAME = "logs"; + + public static void main(String[] argv) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + Connection connection = factory.newConnection(); + Channel channel = connection.createChannel(); + + channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); + String queueName = channel.queueDeclare().getQueue(); + channel.queueBind(queueName, EXCHANGE_NAME, ""); + + System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); + + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + String message = new String(delivery.getBody(), "UTF-8"); + System.out.println(" [x] Received '" + message + "'"); + }; + channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { + }); + } +} diff --git a/src/main/java/rabbitmq/tuto4/EmitLogDirect.java b/src/main/java/rabbitmq/tuto4/EmitLogDirect.java new file mode 100644 index 0000000..dfe0f10 --- /dev/null +++ b/src/main/java/rabbitmq/tuto4/EmitLogDirect.java @@ -0,0 +1,57 @@ +package rabbitmq.tuto4; + +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +public class EmitLogDirect { + + private static final String EXCHANGE_NAME = "direct_logs"; + + public static void main(String[] argv) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel()) { + channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); + + String severity = getSeverity(argv); + String message = getMessage(argv); + + channel.basicPublish(EXCHANGE_NAME, severity, null, + message.getBytes("UTF-8")); + System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); + } + } + + private static String getSeverity(String[] strings) { + if (strings.length < 1) { + return "info"; + } + return strings[0]; + } + + private static String getMessage(String[] strings) { + if (strings.length < 2) { + return "Hello World!"; + } + return joinStrings(strings, " ", 1); + } + + private static String joinStrings(String[] strings, String delimiter, + int startIndex) { + int length = strings.length; + if (length == 0) { + return ""; + } + if (length < startIndex) { + return ""; + } + StringBuilder words = new StringBuilder(strings[startIndex]); + for (int i = startIndex + 1; i < length; i++) { + words.append(delimiter).append(strings[i]); + } + return words.toString(); + } +} diff --git a/src/main/java/rabbitmq/tuto4/ReceiveLogsDirect.java b/src/main/java/rabbitmq/tuto4/ReceiveLogsDirect.java new file mode 100644 index 0000000..49c73eb --- /dev/null +++ b/src/main/java/rabbitmq/tuto4/ReceiveLogsDirect.java @@ -0,0 +1,36 @@ +package rabbitmq.tuto4; + +import com.rabbitmq.client.*; + +public class ReceiveLogsDirect { + + private static final String EXCHANGE_NAME = "direct_logs"; + + public static void main(String[] argv) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + Connection connection = factory.newConnection(); + Channel channel = connection.createChannel(); + + channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); + String queueName = channel.queueDeclare().getQueue(); + + if (argv.length < 1) { + System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); + System.exit(1); + } + + for (String severity : argv) { + channel.queueBind(queueName, EXCHANGE_NAME, severity); + } + System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); + + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + String message = new String(delivery.getBody(), "UTF-8"); + System.out.println(" [x] Received '" + + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); + }; + channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { + }); + } +} diff --git a/src/main/java/rabbitmq/tuto5/EmitLogTopic.java b/src/main/java/rabbitmq/tuto5/EmitLogTopic.java new file mode 100644 index 0000000..123d03c --- /dev/null +++ b/src/main/java/rabbitmq/tuto5/EmitLogTopic.java @@ -0,0 +1,57 @@ +package rabbitmq.tuto5; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +public class EmitLogTopic { + + private static final String EXCHANGE_NAME = "topic_logs"; + + public static void main(String[] argv) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel()) { + + channel.exchangeDeclare(EXCHANGE_NAME, "topic"); + + String routingKey = getRouting(argv); + String message = getMessage(argv); + + channel.basicPublish(EXCHANGE_NAME, routingKey, null, + message.getBytes("UTF-8")); + System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); + } + } + + private static String getRouting(String[] strings) { + if (strings.length < 1) { + return "anonymous.info"; + } + return strings[0]; + } + + private static String getMessage(String[] strings) { + if (strings.length < 2) { + return "Hello World!"; + } + return joinStrings(strings, " ", 1); + } + + private static String joinStrings(String[] strings, String delimiter, + int startIndex) { + int length = strings.length; + if (length == 0) { + return ""; + } + if (length < startIndex) { + return ""; + } + StringBuilder words = new StringBuilder(strings[startIndex]); + for (int i = startIndex + 1; i < length; i++) { + words.append(delimiter).append(strings[i]); + } + return words.toString(); + } +} diff --git a/src/main/java/rabbitmq/tuto5/ReceiveLogsTopic.java b/src/main/java/rabbitmq/tuto5/ReceiveLogsTopic.java new file mode 100644 index 0000000..58c67e6 --- /dev/null +++ b/src/main/java/rabbitmq/tuto5/ReceiveLogsTopic.java @@ -0,0 +1,40 @@ +package rabbitmq.tuto5; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; + +public class ReceiveLogsTopic { + + private static final String EXCHANGE_NAME = "topic_logs"; + + public static void main(String[] argv) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + Connection connection = factory.newConnection(); + Channel channel = connection.createChannel(); + + channel.exchangeDeclare(EXCHANGE_NAME, "topic"); + String queueName = channel.queueDeclare().getQueue(); + + if (argv.length < 1) { + System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); + System.exit(1); + } + + for (String bindingKey : argv) { + channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); + } + + System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); + + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + String message = new String(delivery.getBody(), "UTF-8"); + System.out.println(" [x] Received '" + + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); + }; + channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { + }); + } +} diff --git a/src/main/java/rabbitmq/tuto6/RPCClient.java b/src/main/java/rabbitmq/tuto6/RPCClient.java new file mode 100644 index 0000000..ade61ef --- /dev/null +++ b/src/main/java/rabbitmq/tuto6/RPCClient.java @@ -0,0 +1,70 @@ +package rabbitmq.tuto6; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeoutException; + +public class RPCClient implements AutoCloseable { + + private Connection connection; + private Channel channel; + private String requestQueueName = "rpc_queue"; + + public RPCClient() throws IOException, TimeoutException { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + + connection = factory.newConnection(); + channel = connection.createChannel(); + } + + public static void main(String[] argv) { + try (RPCClient fibonacciRpc = new RPCClient()) { + for (int i = 0; i < 32; i++) { + String i_str = Integer.toString(i); + System.out.println(" [x] Requesting fib(" + i_str + ")"); + String response = fibonacciRpc.call(i_str); + System.out.println(" [.] Got '" + response + "'"); + } + } catch (IOException | TimeoutException | InterruptedException e) { + e.printStackTrace(); + } + } + + public String call(String message) throws IOException, InterruptedException { + final String corrId = UUID.randomUUID().toString(); + + String replyQueueName = channel.queueDeclare().getQueue(); + AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() + .correlationId(corrId).replyTo(replyQueueName).build(); + + channel.basicPublish("", requestQueueName, props, + message.getBytes("UTF-8")); + + final BlockingQueue response = new ArrayBlockingQueue<>(1); + + String ctag = channel.basicConsume(replyQueueName, true, + (consumerTag, delivery) -> { + if (delivery.getProperties().getCorrelationId().equals(corrId)) { + response.offer(new String(delivery.getBody(), "UTF-8")); + } + }, consumerTag -> { + }); + + String result = response.take(); + channel.basicCancel(ctag); + return result; + } + + @Override + public void close() throws IOException { + connection.close(); + } +} diff --git a/src/main/java/rabbitmq/tuto6/RPCServer.java b/src/main/java/rabbitmq/tuto6/RPCServer.java new file mode 100644 index 0000000..1cc8515 --- /dev/null +++ b/src/main/java/rabbitmq/tuto6/RPCServer.java @@ -0,0 +1,74 @@ +package rabbitmq.tuto6; + +import com.rabbitmq.client.*; + +public class RPCServer { + + private static final String RPC_QUEUE_NAME = "rpc_queue"; + + private static int fib(int n) { + if (n == 0) { + return 0; + } + if (n == 1) { + return 1; + } + return fib(n - 1) + fib(n - 2); + } + + public static void main(String[] argv) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel()) { + channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); + channel.queuePurge(RPC_QUEUE_NAME); + + channel.basicQos(1); + + System.out.println(" [x] Awaiting RPC requests"); + + Object monitor = new Object(); + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder() + .correlationId(delivery.getProperties().getCorrelationId()).build(); + + String response = ""; + + try { + String message = new String(delivery.getBody(), "UTF-8"); + int n = Integer.parseInt(message); + + System.out.println(" [.] fib(" + message + ")"); + response += fib(n); + } catch (RuntimeException e) { + System.out.println(" [.] " + e.toString()); + } finally { + channel.basicPublish("", delivery.getProperties().getReplyTo(), + replyProps, response.getBytes("UTF-8")); + channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); + // RabbitMq consumer worker thread notifies the RPC server owner + // thread + synchronized (monitor) { + monitor.notify(); + } + } + }; + + channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, + (consumerTag -> { + })); + // Wait and be prepared to consume the message from RPC client. + while (true) { + synchronized (monitor) { + try { + monitor.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + } +} \ No newline at end of file