sguzzo avatar image
sguzzo asked Erick Ramirez commented

How can I send timestamp or date type with Pulsar schema and consume it in Astra sink?


I'm encountering some difficulties in correctly consume data with an Astra sink.
The issue derives by the fact that some of the columns that I have to fulfill in my Astra table have type date or timestamp. When sending this data from my producer - using the python client - I define a schema, but apparently I cannot assign to the field in my schema the types timestamp or date (please refer to this page).

So the question is: how can I send date or timestamp and have them correctly consumed by my sink? Should I use Pulsar functions?

Additional info

  • Astra table description:
CREATE TABLE streaming_test.test_streaming_timestamp (
    sensor text,
    added timeuuid,
    temperature int,
    timestamp timestamp,
    PRIMARY KEY (sensor, added)

  • mapping
sensor=value.sensor, added=now(), temperature=value.temperature, timestamp=value.timestamp

  • python producer script
import uuid
import pulsar
from pulsar.schema import *
from datetime import datetime
import random

class Example(Record):
    sensor = String()
    temperature = Integer()
    timestamp = String()

service_url = 'pulsar+ssl://'

# Use default CA certs for your environment
# Debian/Ubuntu:


client = pulsar.Client(service_url,

producer = client.create_producer('persistent://my-tenant/my-ns/my-topic', schema=AvroSchema(Example))

now ="%Y%m%dT%H:%M:%S")

producer.send(Example(sensor=str(uuid.uuid4()), temperature=random.randrange(20,30), timestamp=now))




P.S. tagging @Aleks Volochnev so you can follow the conversation!

astra dbastra streaming
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

Erick Ramirez avatar image
Erick Ramirez answered Erick Ramirez commented

I've reached out to the Astra Streaming team here at DataStax and they advised that you need to define codecs on the Pulsar topic for the conversion to work. For example:

      locale: en_US
      timeZone: UTC
      timestamp: CQL_TIMESTAMP
      date: ISO_LOCAL_DATE
      time: ISO_LOCAL_TIME
      unit: MILLISECONDS

For details, see Converting date and times for a topic. Cheers!

4 comments Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

sguzzo avatar image sguzzo commented ·

Hi Erick,

Thanks for your answer!
The only way I know to do this in, is through cli `pulsar-admin` e.g.:

./bin/pulsar-admin sinks create \
  --name test-sink-cli \
  --sink-type cassandra-enhanced \ 
  --inputs persistent://my-tenant/my-ns/my-topic \
  --sink-config-file ./my-sink-config.yaml 

where `my-sink-config.yaml` contains the configuration for my sink, including the codecs snippet you reported above.
Unfortunately, when using Astra this is not possible because the user - me - doesn't have admin rights! I don't know how I can see the actual configuration, if possible..
When creating a sink in Astra, I only have control over the mapping..

Am I missing something?


0 Likes 0 ·
Erick Ramirez avatar image Erick Ramirez ♦♦ sguzzo commented ·

Sorry, I forgot that you're on Astra Streaming.

I was told that you don't need to configure the mapping if you are using Avro or JSON.

Going back to your original question, do you have a minimal code sample that shows how you're sending data and what errors you're getting? Note that you won't be able to post a large amount of text as a comment so you'll need to post the update in your original question above. Cheers!

1 Like 1 ·
sguzzo avatar image sguzzo Erick Ramirez ♦♦ commented ·

Hi Erick! I accepted the answer because I found my mistake as being indeed a bad formatted timestamp (dumb of me!).

I was sending messages with timestamp formatted as `"%Y%m%dT%H:%M:%S"` but i needed dash between year-month-day!

Mistery solved, thanks a lot!!

0 Likes 0 ·
Show more comments