Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

sguzzo avatar image
sguzzo asked ds-steven-matison commented

Why is the data streamed using the Java client appear as encoded on Apache Pulsar?

I'm trying to stream some data from a Java app into an Apache-Pulsar cluster. The issue that I'm facing is that the data appears to be encoded. .e.g.

 "\u0000\u0000\u0000\u0004\u0018l@\u0000\u0000\u0000�@�\u000fV�\u0001\u0000\u00006B\u0000\u0000�@\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000�\t���AUU�A\u0002(2021-10-04T14:00:00Z\u0002H88c8dc24-233c-45f5-b366-85382d7d52c6\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0001\u0000\u0000\u0000\u0000"` 

just the parameters that I send as strings are correct apparently.

My producer code is build this way:

PulsarClient client = PulsarClient.builder()
                .serviceUrl(service_url)
                .tlsTrustCertsFilePath("/etc/ssl/certs/ca-certificates.crt")
                .authentication(
                        AuthenticationFactory.token(token)
                )
                .build();

Producer<DavisMessage> producer = client.newProducer(Schema.AVRO(DavisMessage.class))
                .topic(topic)
                .create();

Timestamp timestamp = new Timestamp(rec.getTimestamp().getTime());
final String formattedtimestamp = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'")
                .format(timestamp);

float lon = Float.parseFloat((prop.getProperty("sensor.longitude")));
float lat = Float.parseFloat((prop.getProperty("sensor.latitude")));
float alt = Float.parseFloat((prop.getProperty("sensor.altitude")));
log.fine("Sending message to Pulsar.");
producer.newMessage().value(DavisMessage.builder()
                .uuid(prop.getProperty("sensor.uuid"))
                .latitude(lat)
                .longitude(lon)
                .altitude(alt)
                .ts(formattedtimestamp)
                .temp_out((float) rec.getOutsideTemperature())
                .temp_in((float) rec.getInsideTemperature())
                .hum_out((short) rec.getOutsideHumidity())
                .hum_in((short) rec.getInsideHumidity())
                .barometer(rec.getBarometer())
                .rain((float) rec.getRainFall())
                .rain_rate((float) rec.getRainRateHigh())
                .wind_avg((float) rec.getWindSpeedAvg())
                .wind_dir((short) rec.getWindDirection())
                .wind_high((float) rec.getWindSpeedHigh())
                .solar((short) rec.getSolarRadiation())
                .uv((float) rec.getUvIndex())
                .build()).send();

while my schema is

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;

//TODO understand why it can't be static
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class DavisMessage {
    String uuid;
    float latitude;
    float longitude;
    float altitude;
    String ts;
    float temp_out;
    float temp_in;
    int hum_out;
    int hum_in;
    int barometer;
    float rain;
    float rain_rate;
    float wind_avg;
    int wind_dir;
    float wind_high;
    int solar;
    float uv;
}
astra streaming
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

sguzzo avatar image
sguzzo answered

The issue was that I wasn't using the same AVRO schema on the consumer side, therefore it was appearing as encoded.

Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

ds-steven-matison avatar image
ds-steven-matison answered ds-steven-matison commented

Can you confirm the payload you are delivering to .newMessage() is correct?

Also share the client you are using to read the message? I assume this is where you see the encoded payload. Want to make sure that also has a schema.

This repo is a full example of produce/consume that may be helpful. My schema is structure different and may help too.


https://github.com/ds-steven-matison/astra-connection-pulsar




2 comments Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Hi @ds-steven-matison , thank you for your answer!

My issue was deriving by the fact that I wasn't using the same schema for deserializing on the consumer's side. I was using a java client for the producer and a python one for the consumer, with an AVRO schema.

1 Like 1 ·

Excellent news, i was pretty sure it was just a schema conflict. Glad you worked through it successfully!

0 Likes 0 ·