I'm trying to stream some data from a Java app into an Apache-Pulsar cluster. The issue that I'm facing is that the data appears to be encoded. .e.g.
"\u0000\u0000\u0000\u0004\u0018l@\u0000\u0000\u0000�@�\u000fV�\u0001\u0000\u00006B\u0000\u0000�@\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000�\t���AUU�A\u0002(2021-10-04T14:00:00Z\u0002H88c8dc24-233c-45f5-b366-85382d7d52c6\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0001\u0000\u0000\u0000\u0000"`
just the parameters that I send as strings are correct apparently.
My producer code is build this way:
PulsarClient client = PulsarClient.builder() .serviceUrl(service_url) .tlsTrustCertsFilePath("/etc/ssl/certs/ca-certificates.crt") .authentication( AuthenticationFactory.token(token) ) .build(); Producer<DavisMessage> producer = client.newProducer(Schema.AVRO(DavisMessage.class)) .topic(topic) .create(); Timestamp timestamp = new Timestamp(rec.getTimestamp().getTime()); final String formattedtimestamp = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'") .format(timestamp); float lon = Float.parseFloat((prop.getProperty("sensor.longitude"))); float lat = Float.parseFloat((prop.getProperty("sensor.latitude"))); float alt = Float.parseFloat((prop.getProperty("sensor.altitude"))); log.fine("Sending message to Pulsar."); producer.newMessage().value(DavisMessage.builder() .uuid(prop.getProperty("sensor.uuid")) .latitude(lat) .longitude(lon) .altitude(alt) .ts(formattedtimestamp) .temp_out((float) rec.getOutsideTemperature()) .temp_in((float) rec.getInsideTemperature()) .hum_out((short) rec.getOutsideHumidity()) .hum_in((short) rec.getInsideHumidity()) .barometer(rec.getBarometer()) .rain((float) rec.getRainFall()) .rain_rate((float) rec.getRainRateHigh()) .wind_avg((float) rec.getWindSpeedAvg()) .wind_dir((short) rec.getWindDirection()) .wind_high((float) rec.getWindSpeedHigh()) .solar((short) rec.getSolarRadiation()) .uv((float) rec.getUvIndex()) .build()).send();
while my schema is
import lombok.AllArgsConstructor; import lombok.Builder; import lombok.NoArgsConstructor; //TODO understand why it can't be static @Builder @AllArgsConstructor @NoArgsConstructor public class DavisMessage { String uuid; float latitude; float longitude; float altitude; String ts; float temp_out; float temp_in; int hum_out; int hum_in; int barometer; float rain; float rain_rate; float wind_avg; int wind_dir; float wind_high; int solar; float uv; }