MCPcopy
hub / github.com/feast-dev/feast / KafkaOptions

Class KafkaOptions

sdk/python/feast/data_source.py:34–97  ·  view source on GitHub ↗

DataSource Kafka options used to source features from Kafka messages

Source from the content-addressed store, hash-verified

32
33
34class KafkaOptions:
35 """
36 DataSource Kafka options used to source features from Kafka messages
37 """
38
39 def __init__(
40 self,
41 kafka_bootstrap_servers: str,
42 message_format: StreamFormat,
43 topic: str,
44 watermark_delay_threshold: Optional[timedelta] = None,
45 ):
46 self.kafka_bootstrap_servers = kafka_bootstrap_servers
47 self.message_format = message_format
48 self.topic = topic
49 self.watermark_delay_threshold = watermark_delay_threshold or None
50
51 @classmethod
52 def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions):
53 """
54 Creates a KafkaOptions from a protobuf representation of a kafka option
55
56 Args:
57 kafka_options_proto: A protobuf representation of a DataSource
58
59 Returns:
60 Returns a KafkaOptions object based on the kafka_options protobuf
61 """
62 watermark_delay_threshold = None
63 if kafka_options_proto.HasField("watermark_delay_threshold"):
64 watermark_delay_threshold = (
65 timedelta(days=0)
66 if kafka_options_proto.watermark_delay_threshold.ToNanoseconds() == 0
67 else kafka_options_proto.watermark_delay_threshold.ToTimedelta()
68 )
69 kafka_options = cls(
70 kafka_bootstrap_servers=kafka_options_proto.kafka_bootstrap_servers,
71 message_format=StreamFormat.from_proto(kafka_options_proto.message_format),
72 topic=kafka_options_proto.topic,
73 watermark_delay_threshold=watermark_delay_threshold,
74 )
75
76 return kafka_options
77
78 def to_proto(self) -> DataSourceProto.KafkaOptions:
79 """
80 Converts an KafkaOptionsProto object to its protobuf representation.
81
82 Returns:
83 KafkaOptionsProto protobuf
84 """
85 watermark_delay_threshold = None
86 if self.watermark_delay_threshold is not None:
87 watermark_delay_threshold = Duration()
88 watermark_delay_threshold.FromTimedelta(self.watermark_delay_threshold)
89
90 kafka_options_proto = DataSourceProto.KafkaOptions(
91 kafka_bootstrap_servers=self.kafka_bootstrap_servers,

Callers 1

__init__Method · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected