Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started



sguzzo avatar image
sguzzo asked ds-steven-matison commented

Why is the data streamed using the Java client appear as encoded on Apache Pulsar?

I'm trying to stream some data from a Java app into an Apache-Pulsar cluster. The issue that I'm facing is that the data appears to be encoded. .e.g.


just the parameters that I send as strings are correct apparently.

My producer code is build this way:

PulsarClient client = PulsarClient.builder()

Producer<DavisMessage> producer = client.newProducer(Schema.AVRO(DavisMessage.class))

Timestamp timestamp = new Timestamp(rec.getTimestamp().getTime());
final String formattedtimestamp = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'")

float lon = Float.parseFloat((prop.getProperty("sensor.longitude")));
float lat = Float.parseFloat((prop.getProperty("sensor.latitude")));
float alt = Float.parseFloat((prop.getProperty("sensor.altitude")));
log.fine("Sending message to Pulsar.");
                .temp_out((float) rec.getOutsideTemperature())
                .temp_in((float) rec.getInsideTemperature())
                .hum_out((short) rec.getOutsideHumidity())
                .hum_in((short) rec.getInsideHumidity())
                .rain((float) rec.getRainFall())
                .rain_rate((float) rec.getRainRateHigh())
                .wind_avg((float) rec.getWindSpeedAvg())
                .wind_dir((short) rec.getWindDirection())
                .wind_high((float) rec.getWindSpeedHigh())
                .solar((short) rec.getSolarRadiation())
                .uv((float) rec.getUvIndex())

while my schema is

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;

//TODO understand why it can't be static
public class DavisMessage {
    String uuid;
    float latitude;
    float longitude;
    float altitude;
    String ts;
    float temp_out;
    float temp_in;
    int hum_out;
    int hum_in;
    int barometer;
    float rain;
    float rain_rate;
    float wind_avg;
    int wind_dir;
    float wind_high;
    int solar;
    float uv;
astra streaming
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

sguzzo avatar image
sguzzo answered

The issue was that I wasn't using the same AVRO schema on the consumer side, therefore it was appearing as encoded.

10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

ds-steven-matison avatar image
ds-steven-matison answered ds-steven-matison commented

Can you confirm the payload you are delivering to .newMessage() is correct?

Also share the client you are using to read the message? I assume this is where you see the encoded payload. Want to make sure that also has a schema.

This repo is a full example of produce/consume that may be helpful. My schema is structure different and may help too.

2 comments Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Hi @ds-steven-matison , thank you for your answer!

My issue was deriving by the fact that I wasn't using the same schema for deserializing on the consumer's side. I was using a java client for the producer and a python one for the consumer, with an AVRO schema.

1 Like 1 ·

Excellent news, i was pretty sure it was just a schema conflict. Glad you worked through it successfully!

0 Likes 0 ·