# 策展 · X (Twitter) 🔥

> 作者：CocoIndex (@cocoindex_io) · 平台：X (Twitter) · 日期：2026-04-29

> 原始來源：https://x.com/cocoindex_io/status/2049142394432729278

## 中文摘要

# 從持續變動的非結構化資料到 Kafka 串流 — 使用 CocoIndex

CocoIndex 剛剛新增了 Kafka 目標 connector。現在，你可以像宣告 Postgres 資料表或向量索引一樣，將 Kafka 主題 (topic) 宣告為 pipeline 的目標。當你的來源資料發生變動時，CocoIndex 會自動增量產生訊息，無需編寫 producer 迴圈、無需維護狀態，也不必處理「我是否已經發布過這列資料？」這類邏輯。

## 試用看看

```bash
git clone https://github.com/cocoindex-io/cocoindex
cd cocoindex/examples/csv_to_kafka
cp .env.example .env  # 填入你的 Kafka bootstrap 與 SASL 憑證
pip install -e .
cocoindex update -L main.py
```

## 從靜態知識到串流訊號

現今許多 Agent 堆疊都是圍繞著知識來源的定期快照來建構的。Wiki 頁面在隔夜重新索引、程式庫透過 cron 定期重新嵌入 (re-embedded)、CRM 系統按排程重新拉取資料，而 Agent 則不斷讀取這些快照，希望能偵測到變動。對於可能執行數小時的長效型 Agent 來說，執行開始時捕捉的快照很快就會與底層資料不同步。

常見的下一步是透過點對點 (point-to-point) 的 webhooks 將來源直接連接到消費者，但這種方法一旦涉及多個消費者，就會出現眾所周知的限制。它缺乏重播 (replay) 或回填 (backfill) 的共享路徑，沒有緩衝區來吸收大量變動同時湧入時的突發流量，也沒有描述跨系統「變動」樣貌的通用 schema。走這條路的團隊，往往最終會在每個整合項目中，各自重新實作一套持久化日誌 (durable log) 的功能。

![](https://pub-75d4fe1e4e80421b9ecb1245a7ae0d1a.r2.dev/curated/1777433246137-diaHGQKeWbcAAUv9Wjpg.jpg)

CocoIndex 與 Kafka 的結合採取了不同的途徑：它將知識層視為與多年來處理營運資料相同的方式——即變動事件流，而非需要重複讀取的快照。硬碟、儲存庫、設計文件、Wiki、PDF 和檔案共用——這些傳統上存在於串流世界之外的非結構化資料——現在可以發布到與訂單、點擊和 CDC 流量相同的事件主幹 (event backbone) 上。其優勢體現在多個方面：

![](https://pub-75d4fe1e4e80421b9ecb1245a7ae0d1a.r2.dev/curated/1777433245955-diaHGQQyFbEAALFTijpg.jpg)

- **更高效的 AI 工作負載**：嵌入 (embeddings)、檢索和 Agent 記憶只有在資料真正發生變動時才會更新，這既減少了冗餘工作，同時也提升了資料的新鮮度。

- **單一變動觸及所有消費者**：一次 commit、一個重新命名的 Drive 文件或 Notion 編輯，即可同時更新向量索引、通知 Agent、更新搜尋結果、餵給 Flink 作業並寫入 BI 儀表板——而這些系統彼此之間完全不需要互相了解。

- **更易於擴充**：新的 Agent、重建的 RAG 層或合規工具可以作為主題的另一個訂閱者加入，且日誌提供了重播功能，讓它們能以與處理新變動相同的方式處理歷史變動。

- **更好的可稽核性**：Agent 所消耗的每個變動都透過 offset 和時間戳記進行了持久化記錄，這使得回答「Agent 在採取行動前是否看到了更新後的政策？」這類問題時，能有具體的證據支持。

- **長期的穩定契約**：主題上的變動事件 schema 在來源與消費者之間提供了穩定的介面。偵測器、來源和模型可以獨立演進，而傳輸格式保持一致。

## Kafka：訊息進，訊息出 — 現在擴展到非結構化世界

Kafka 的契約非常簡單：訊息進，訊息出。正是這種單一的形狀，使得串流生態系統的其他部分——Flink、ksqlDB、向量資料庫、OLAP 接收端、搜尋後端、微服務、Agent 執行環境——能夠進行組合。任何下游系統只需讀取主題，就能獲得即時、可重播且預設為廣播 (fan-out) 的世界視圖。

問題在於，過去進入 Kafka 的訊息通常是結構化的：訂單、點擊、IoT 遙測資料，或是來自 Postgres 或 MySQL 的 Debezium CDC。而業務的另一半——會議記錄、程式庫、設計文件、PDF、檔案共用、Wiki——則生活在一個平行的宇宙中，充斥著廠商特定的 webhooks、每晚執行的批次作業，以及沒人想維護的一次性 ETL 腳本。

![](https://pub-75d4fe1e4e80421b9ecb1245a7ae0d1a.r2.dev/curated/1777433246315-diaHGQe2TbIAAB60Wjpg.jpg)

CocoIndex 填補了這個缺口。它將動態變動的非結構化 asset 視為一等公民的 CDC 來源，並將乾淨的鍵值 (key/value) 變動事件發送到 Kafka，並具備本文稍後將描述的 upsert / delete / no-op 語意。

## 與 StreamNative 的合作

我們正與 StreamNative 合作，將即時資料基礎設施引入 AI 工作負載。這兩者完美契合，因為它們解決了不同的問題：CocoIndex 觀察雜亂、動態的非結構化來源，並對「自上次以來有什麼不同？」給出精確的答案。StreamNative 基於 Ursa 的 Kafka 服務則確保該答案能可靠、大規模且完整保留歷史地傳達給每個關心的系統。有了 Kafka 在中間，來源不必表現得像串流，Agent 也不必表現得像資料庫客戶端；主題吸收了阻抗不匹配 (impedance mismatch)，讓雙方都能按照各自的節奏演進。

## 範例：CSV 檔案 → JSON 訊息

為了讓這一切具體化，我們將建構一個最小的 End to End (端到端) pipeline 來測試這個新的 connector：一個本地的 `data/` 資料夾存放 CSV 檔案，進行即時監控，並將每一列資料作為 JSON 訊息發布到 StreamNative 上的 Kafka 主題。編輯一個儲存格，一秒內該列資料對應的一則訊息就會出現在主題上。新增一列，就得到一則新訊息。刪除一個檔案，該檔案中的每一列都會被標記為刪除 (tombstoned)。整個程式大約只有 60 行 Python 程式碼。

完整範例位於 CocoIndex 儲存庫中。結構如下：
https://github.com/cocoindex-io/cocoindex/tree/main/examples/csv_to_kafka

## Pipeline

首先，Kafka producer 在應用程式啟動時透過 lifespan hook 設定一次，並存放在 `ContextKey` 中，以便 pipeline 的其餘部分可以呼叫它，而無需到處傳遞：

```python
import cocoindex as coco
from cocoindex.connectors import kafka, localfs
from confluent_kafka.aio import AIOProducer

KAFKA_PRODUCER = coco.ContextKey[AIOProducer]("kafka_producer", tracked=False)

@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder):
    config = {
        "bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
        "sasl.mechanism": "PLAIN",
        "security.protocol": "SASL_SSL",
        "sasl.username": KAFKA_SASL_USERNAME,
        "sasl.password": KAFKA_SASL_PASSWORD,
    }
    producer = AIOProducer(config)
    builder.provide(KAFKA_PRODUCER, producer)
    yield
```

接下來是針對每個檔案的處理器。這是新的 Kafka API 出現的地方：

```python
@coco.fn(memo=True)
async def process_csv(file: FileLike, topic_target: kafka.KafkaTopicTarget) -> None:
    text = await file.read_text()
    reader = csv.DictReader(io.StringIO(text))

    headers = reader.fieldnames
    if not headers:
        return
    first_col = headers[0]

    for row in reader:
        key_value = row.get(first_col)
        if key_value is not None:
            value = json.dumps(row)
            topic_target.declare_target_state(key=key_value, value=value)
```

宣告狀態，而非訊息

```
topic_target.declare_target_state(key=key, value=value)
```

CocoIndex 是一個狀態驅動的資料框架。其心智模型與你用於試算表、React 元件樹或 SQL 物化視圖的模型相同：你描述目標應該是什麼樣子（作為來源的函數），框架則負責處理轉換。你不需要計算差異 (deltas)。你不需要追蹤「我上次發送了什麼」。你不需要將新增、更新與刪除處理為不同的程式碼路徑。你只需要說：「給定這列 CSV 資料，鍵值 SKU001 的目標狀態就是這個 JSON blob」，這樣就完成了。

![](https://pub-75d4fe1e4e80421b9ecb1245a7ae0d1a.r2.dev/curated/1777433246459-diaHGRlmia8AANu8Kjpg.jpg)

Kafka 讓這種區別變得異常明顯，因為 Kafka 的傳輸模型恰恰相反：主題是事件的日誌，而不是快照。Producer 發送變動事件；消費者（或壓縮過的主題）從日誌中重建狀態。因此問題在於：誰負責處理「我有期望的狀態」與「Broker 需要接收變動事件」之間的差距？

訊息是從狀態轉換中導出的。你永遠只需要討論狀態。這與 Postgres 目標 (`declare_target_state` → INSERT / UPDATE / DELETE) 和向量索引目標的模式完全相同——傳輸層的操作可能不同，但使用者介面 API 的形狀是一樣的，因為語意是相同的。

這在實務上之所以重要，是因為這意味著相同的 `process_csv` 函數在第一次執行、後續每次執行、資料列被編輯、資料列被移除、檔案被刪除，或是整個 pipeline 當機重啟時，都能正確運作。沒有所謂「初始載入」與「增量更新」的獨立程式碼路徑。只有「給定來源，目標應該是什麼樣子」，而無論目標是空的、半滿的還是已經同步的，這個陳述都是正確的。

```python
@coco.fn
async def app_main() -> None:
    topic_target = await kafka.mount_kafka_topic_target(KAFKA_PRODUCER, KAFKA_TOPIC)

    files = localfs.walk_dir(
        localfs.FilePath(path="./data"),
        path_matcher=PatternFilePathMatcher(included_patterns=["**/*.csv"]),
        live=True,
    )
    await coco.mount_each(process_csv, files.items(), topic_target)

app = coco.App(coco.AppConfig(name="CsvToKafka"), app_main)
```

## Live 模式：一個旗標，其餘皆同

到目前為止，我們描述的是單次執行的情況。但在現實中，來源檔案會被編輯，資料列會被新增或移除，而你通常希望主題能跟上這些變動。相同的 `process_csv` 可以作為補追 (catch-up) 執行——掃描一次，協調自上次以來的所有變動，然後退出——或者作為持續運行的 pipeline，不斷監控變動。兩者之間的差異僅在於一個關鍵字參數和一個 CLI 旗標：

補追執行：

```python
files = localfs.walk_dir(
    localfs.FilePath(path="./data"),
    path_matcher=PatternFilePathMatcher(included_patterns=["**/*.csv"]),
)
await coco.mount_each(process_csv, files.items(), topic_target)
```

```python
cocoindex update main.py
```

Live 模式：

```python
files = localfs.walk_dir(
    localfs.FilePath(path="./data"),
    path_matcher=PatternFilePathMatcher(included_patterns=["**/*.csv"]),
    live=True,                 # ← +1 行
)
await coco.mount_each(process_csv, files.items(), topic_target)
```

這就是全部的差異。`process_csv` 不需要改變。Kafka 目標也不需要改變。沒有獨立的「串流」程式碼路徑需要維護。

執行 Live 版本：

```python
cocoindex update -L main.py
```

CocoIndex 會先進行完整掃描，為每一列發布一則訊息，然後持續監控 `data/`：

- 編輯 `products.csv` 中的一個儲存格 — 該列資料會產生一則 Kafka 訊息（不考慮 Broker 重試；Producer 預設為至少一次傳遞）。其他四列則保持靜默。

- 新增一列 — 產生一則新訊息。

- 刪除一列 — 產生一則刪除訊息（無值，因為此範例未提供 `deletion_value_fn`）。

- 新增一個全新的 CSV 檔案 — `process_csv` 對該檔案執行一次，並發布其資料列。

- 刪除一個 CSV 檔案 — 該檔案中的每一列都會收到一則刪除訊息。

## 查看主題

我們將此連接到 StreamNative Cloud 上的 Kafka 叢集——它讓我們一鍵獲得了真實的 `SASL_SSL` 端點，並附帶一個託管控制台，無需編寫消費者即可檢查訊息。（如果你在 Producer 設定中跳過 SASL 欄位，標準的 `localhost:9092` 也可以運作。）

執行範例後，`cocoindex-csv-rows` 主題在控制台中顯示的內容如下：

![](https://pub-75d4fe1e4e80421b9ecb1245a7ae0d1a.r2.dev/curated/1777433245947-diaHGSIq7bsAAv0snjpg.jpg)

## 支持我們的工作

如果這個專案對你有幫助，我們將非常感謝你在 CocoIndex 專案上給予一個 GitHub Star。

## 標籤

功能更新, 自動化, 開源專案, CocoIndex, Kafka
