From 3e24fa7ae2244e8048592dfe79c3154decfc90c6 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 18 Jun 2026 16:42:14 +0700 Subject: [PATCH 1/3] feat: log context api request - ``` { "stream": "teststream", "contextWindow": "1m", "pTimestamp": "2026-06-18T07:39:59.995Z", "pageSize": 500, "message": "Application started", "conditions": { "operator": "and", "groups": [ { "operator": "or", "conditionConfig": [ { "column": "level", "operator": "=", "value": "warn", "type": "text" } ] } ] } } ``` response - ``` { "scope": "contextWindow", "contextStartTime": "2026-06-18T07:38:00Z", "contextEndTime": "2026-06-18T07:40:00Z", "limit": 500, "anchorIndex": 10, "duplicateAnchorCount": 15, "anchoredDuplicate": "first", "records": [ { "app_meta": "okcequedfmkqlgzheaidrcce", "device_id": 126.0, "host": "172.162.1.120", "level": "warn", "location": "uqwetjbuvjameflh", "message": "Application is failing", "meta-containerimage": "ghcr.io/parseablehq/quest", "meta-containername": "log-generator", "meta-host": "10.116.0.3", "meta-namespace": "go-apasdp", "meta-podlabels": "app=go-app,pod-template-hash=6c87bc9cc9", "meta-source": "quest-test", "os": "Windows", "p_src_ip": "127.0.0.1", "p_timestamp": "2026-06-18T07:39:59.995", "p_user_agent": "Grafana k6/1.6.1", "request_body": "vlywlgkpmciorkiklfruxcfnzaspahyscsazpmnqgquqrtahrzhmtojwvackzcqngscesuadnupwpdsryfrvlifembjotnftzuwx", "session_id": "pqr", "source_time": "2026-06-18T07:39:59.991", "status_code": 500.0, "user_id": 98513.0, "uuid": "169fa593-fa27-4625-8576-1faab8b9cc71", "version": "1.2.0" } ], "queries": { "previous": { "query": "SELECT * FROM (SELECT * FROM \"teststream\" WHERE ((\"p_timestamp\" >= TIMESTAMP '2026-06-18 07:38:00.000' AND \"p_timestamp\" < TIMESTAMP '2026-06-18 07:40:00.000') AND ((\"level\" = 'warn'))) AND (\"p_timestamp\" > TIMESTAMP '2026-06-18 07:39:59.995' OR (\"p_timestamp\" = TIMESTAMP '2026-06-18 07:39:59.995' AND \"message\" < 'Application is failing')) ORDER BY \"p_timestamp\" ASC, \"message\" DESC LIMIT 500) AS log_context_seek_page ORDER BY \"p_timestamp\" DESC, \"message\" ASC", "startTime": "2026-06-18T07:38:00Z", "endTime": "2026-06-18T07:40:00Z", "sendNull": false }, "next": { "query": "SELECT * FROM \"teststream\" WHERE ((\"p_timestamp\" >= TIMESTAMP '2026-06-18 07:38:00.000' AND \"p_timestamp\" < TIMESTAMP '2026-06-18 07:40:00.000') AND ((\"level\" = 'warn'))) AND (\"p_timestamp\" < TIMESTAMP '2026-06-18 07:39:59.662' OR (\"p_timestamp\" = TIMESTAMP '2026-06-18 07:39:59.662' AND \"message\" > 'Logging a request')) ORDER BY \"p_timestamp\" DESC, \"message\" ASC LIMIT 500", "startTime": "2026-06-18T07:38:00Z", "endTime": "2026-06-18T07:40:00Z", "sendNull": false } } } ``` --- Cargo.lock | 437 +++------ src/handlers/http/mod.rs | 1 + src/handlers/http/modal/query_server.rs | 1 + src/handlers/http/modal/server.rs | 12 + src/handlers/http/query.rs | 39 + src/handlers/http/query_context.rs | 1188 +++++++++++++++++++++++ 6 files changed, 1351 insertions(+), 327 deletions(-) create mode 100644 src/handlers/http/query_context.rs diff --git a/Cargo.lock b/Cargo.lock index 0bdf2bfc7..b19e3bf8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -487,40 +487,19 @@ version = "58.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d441fdda254b65f3e9025910eb2c2066b6295d9c8ed409522b8d2ace1ff8574c" dependencies = [ - "arrow-arith 58.1.0", - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-cast 58.1.0", - "arrow-csv 58.1.0", - "arrow-data 58.1.0", - "arrow-ipc 58.1.0", - "arrow-json 58.1.0", - "arrow-ord 58.1.0", - "arrow-row 58.1.0", - "arrow-schema 58.1.0", - "arrow-select 58.1.0", - "arrow-string 58.1.0", -] - -[[package]] -name = "arrow" -version = "59.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffaaa3e009861fd829d0a24dd6f115aa8e4634324bb092147d43baafe69ca4a7" -dependencies = [ - "arrow-arith 59.0.0", - "arrow-array 59.0.0", - "arrow-buffer 59.0.0", - "arrow-cast 59.0.0", - "arrow-csv 59.0.0", - "arrow-data 59.0.0", - "arrow-ipc 59.0.0", - "arrow-json 59.0.0", - "arrow-ord 59.0.0", - "arrow-row 59.0.0", - "arrow-schema 59.0.0", - "arrow-select 59.0.0", - "arrow-string 59.0.0", + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-csv", + "arrow-data", + "arrow-ipc", + "arrow-json", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", ] [[package]] @@ -529,24 +508,10 @@ version = "58.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced5406f8b720cc0bc3aa9cf5758f93e8593cda5490677aa194e4b4b383f9a59" dependencies = [ - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-data 58.1.0", - "arrow-schema 58.1.0", - "chrono", - "num-traits", -] - -[[package]] -name = "arrow-arith" -version = "59.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ac95125e1d71c4a252b5a9c729aef111e80418f08aaa6dbabd1ba66918247fc" -dependencies = [ - "arrow-array 59.0.0", - "arrow-buffer 59.0.0", - "arrow-data 59.0.0", - "arrow-schema 59.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "chrono", "num-traits", ] @@ -558,9 +523,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "772bd34cacdda8baec9418d80d23d0fb4d50ef0735685bd45158b83dfeb6e62d" dependencies = [ "ahash", - "arrow-buffer 58.1.0", - "arrow-data 58.1.0", - "arrow-schema 58.1.0", + "arrow-buffer", + "arrow-data", + "arrow-schema", "chrono", "chrono-tz", "half", @@ -570,24 +535,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "arrow-array" -version = "59.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c60c79628e9a97cb90d7a0dc3e944f216a902f837d4ecabc14d524bddbbc137" -dependencies = [ - "ahash", - "arrow-buffer 59.0.0", - "arrow-data 59.0.0", - "arrow-schema 59.0.0", - "chrono", - "half", - "hashbrown 0.17.1", - "num-complex", - "num-integer", - "num-traits", -] - [[package]] name = "arrow-buffer" version = "58.1.0" @@ -600,30 +547,18 @@ dependencies = [ "num-traits", ] -[[package]] -name = "arrow-buffer" -version = "59.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6026f638c400e9878c1b1cc05c3cfd46fbf381285916ab408678701c1df46c1a" -dependencies = [ - "bytes", - "half", - "num-bigint", - "num-traits", -] - [[package]] name = "arrow-cast" version = "58.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0127816c96533d20fc938729f48c52d3e48f99717e7a0b5ade77d742510736d" dependencies = [ - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-data 58.1.0", - "arrow-ord 58.1.0", - "arrow-schema 58.1.0", - "arrow-select 58.1.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-ord", + "arrow-schema", + "arrow-select", "atoi", "base64", "chrono", @@ -634,51 +569,15 @@ dependencies = [ "ryu", ] -[[package]] -name = "arrow-cast" -version = "59.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c82c236c3caf8df5664284f3f1fbe89938852163998c3fdbf37e84ac220445e9" -dependencies = [ - "arrow-array 59.0.0", - "arrow-buffer 59.0.0", - "arrow-data 59.0.0", - "arrow-ord 59.0.0", - "arrow-schema 59.0.0", - "arrow-select 59.0.0", - "atoi", - "base64", - "chrono", - "half", - "lexical-core", - "num-traits", - "ryu", -] - [[package]] name = "arrow-csv" version = "58.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca025bd0f38eeecb57c2153c0123b960494138e6a957bbda10da2b25415209fe" dependencies = [ - "arrow-array 58.1.0", - "arrow-cast 58.1.0", - "arrow-schema 58.1.0", - "chrono", - "csv", - "csv-core", - "regex", -] - -[[package]] -name = "arrow-csv" -version = "59.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12714e5fb7954159af1e26d4e0d37108bcf1a2ad5ee5c5bf02a944d564d588b7" -dependencies = [ - "arrow-array 59.0.0", - "arrow-cast 59.0.0", - "arrow-schema 59.0.0", + "arrow-array", + "arrow-cast", + "arrow-schema", "chrono", "csv", "csv-core", @@ -691,21 +590,8 @@ version = "58.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42d10beeab2b1c3bb0b53a00f7c944a178b622173a5c7bcabc3cb45d90238df4" dependencies = [ - "arrow-buffer 58.1.0", - "arrow-schema 58.1.0", - "half", - "num-integer", - "num-traits", -] - -[[package]] -name = "arrow-data" -version = "59.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7bd568aa70c4ec5947027b0d5caee94877433b661a0bb9e8ddceeeb5f0c9b1ab" -dependencies = [ - "arrow-buffer 59.0.0", - "arrow-schema 59.0.0", + "arrow-buffer", + "arrow-schema", "half", "num-integer", "num-traits", @@ -717,11 +603,11 @@ version = "58.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "302b2e036335f3f04d65dad3f74ff1f2aae6dc671d6aa04dc6b61193761e16fb" dependencies = [ - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-cast 58.1.0", - "arrow-ipc 58.1.0", - "arrow-schema 58.1.0", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-ipc", + "arrow-schema", "base64", "bytes", "futures", @@ -737,66 +623,27 @@ version = "58.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "609a441080e338147a84e8e6904b6da482cefb957c5cdc0f3398872f69a315d0" dependencies = [ - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-data 58.1.0", - "arrow-schema 58.1.0", - "arrow-select 58.1.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", "flatbuffers", "lz4_flex", "zstd", ] -[[package]] -name = "arrow-ipc" -version = "59.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e57ee4d470eab1a021bc4b63fa2b2c15d572892bf227b0a982d3b755a6c662b5" -dependencies = [ - "arrow-array 59.0.0", - "arrow-buffer 59.0.0", - "arrow-data 59.0.0", - "arrow-schema 59.0.0", - "arrow-select 59.0.0", - "flatbuffers", -] - [[package]] name = "arrow-json" version = "58.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ead0914e4861a531be48fe05858265cf854a4880b9ed12618b1d08cba9bebc8" dependencies = [ - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-cast 58.1.0", - "arrow-data 58.1.0", - "arrow-schema 58.1.0", - "chrono", - "half", - "indexmap", - "itoa", - "lexical-core", - "memchr", - "num-traits", - "ryu", - "serde_core", - "serde_json", - "simdutf8", -] - -[[package]] -name = "arrow-json" -version = "59.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38f47e0e7a284e1f3707a780dc8cd5451b1614e9e398ea2d9ca03c7a2fe9a9ed" -dependencies = [ - "arrow-array 59.0.0", - "arrow-buffer 59.0.0", - "arrow-cast 59.0.0", - "arrow-ord 59.0.0", - "arrow-schema 59.0.0", - "arrow-select 59.0.0", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", "chrono", "half", "indexmap", @@ -816,24 +663,11 @@ version = "58.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "763a7ba279b20b52dad300e68cfc37c17efa65e68623169076855b3a9e941ca5" dependencies = [ - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-data 58.1.0", - "arrow-schema 58.1.0", - "arrow-select 58.1.0", -] - -[[package]] -name = "arrow-ord" -version = "59.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a79cf73ad2eba8686ec2aa9bbf8671208e509025f166afc040cedbd94ffe4983" -dependencies = [ - "arrow-array 59.0.0", - "arrow-buffer 59.0.0", - "arrow-data 59.0.0", - "arrow-schema 59.0.0", - "arrow-select 59.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", ] [[package]] @@ -842,23 +676,10 @@ version = "58.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e14fe367802f16d7668163ff647830258e6e0aeea9a4d79aaedf273af3bdcd3e" dependencies = [ - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-data 58.1.0", - "arrow-schema 58.1.0", - "half", -] - -[[package]] -name = "arrow-row" -version = "59.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cea0f7d8ed6182f14952761e2c0f989852d5aa334fcbc49f73a9f2247c25b879" -dependencies = [ - "arrow-array 59.0.0", - "arrow-buffer 59.0.0", - "arrow-data 59.0.0", - "arrow-schema 59.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "half", ] @@ -873,12 +694,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "arrow-schema" -version = "59.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80b3e786a0dd9103acd583a6fb486dbf2f3268466cc0bd571dcf34cef231c1f1" - [[package]] name = "arrow-select" version = "58.1.0" @@ -886,24 +701,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78694888660a9e8ac949853db393af2a8b8fc82c19ce333132dfa2e72cc1a7fe" dependencies = [ "ahash", - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-data 58.1.0", - "arrow-schema 58.1.0", - "num-traits", -] - -[[package]] -name = "arrow-select" -version = "59.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "067a67e0361f6c31f4a7248759f36ca4ca71b187a941ed4d49da1c7d3d4db624" -dependencies = [ - "ahash", - "arrow-array 59.0.0", - "arrow-buffer 59.0.0", - "arrow-data 59.0.0", - "arrow-schema 59.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "num-traits", ] @@ -913,28 +714,11 @@ version = "58.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61e04a01f8bb73ce54437514c5fd3ee2aa3e8abe4c777ee5cc55853b1652f79e" dependencies = [ - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-data 58.1.0", - "arrow-schema 58.1.0", - "arrow-select 58.1.0", - "memchr", - "num-traits", - "regex", - "regex-syntax", -] - -[[package]] -name = "arrow-string" -version = "59.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99bc95847f3ff62a2b03d6f8ce2e3e78f01362060549a2a311898dd442f6256d" -dependencies = [ - "arrow-array 59.0.0", - "arrow-buffer 59.0.0", - "arrow-data 59.0.0", - "arrow-schema 59.0.0", - "arrow-select 59.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", "memchr", "num-traits", "regex", @@ -1743,8 +1527,8 @@ version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "93db0e623840612f7f2cd757f7e8a8922064192363732c88692e0870016e141b" dependencies = [ - "arrow 58.1.0", - "arrow-schema 58.1.0", + "arrow", + "arrow-schema", "async-trait", "bytes", "bzip2", @@ -1798,7 +1582,7 @@ version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37cefde60b26a7f4ff61e9d2ff2833322f91df2b568d7238afe67bde5bdffb66" dependencies = [ - "arrow 58.1.0", + "arrow", "async-trait", "dashmap", "datafusion-common", @@ -1823,7 +1607,7 @@ version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17e112307715d6a7a331111a4c2330ff54bc237183511c319e3708a4cff431fb" dependencies = [ - "arrow 58.1.0", + "arrow", "async-trait", "datafusion-catalog", "datafusion-common", @@ -1847,8 +1631,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d72a11ca44a95e1081870d3abb80c717496e8a7acb467a1d3e932bb636af5cc2" dependencies = [ "ahash", - "arrow 58.1.0", - "arrow-ipc 58.1.0", + "arrow", + "arrow-ipc", "chrono", "half", "hashbrown 0.16.1", @@ -1882,7 +1666,7 @@ version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9fb386e1691355355a96419978a0022b7947b44d4a24a6ea99f00b6b485cbb6" dependencies = [ - "arrow 58.1.0", + "arrow", "async-compression", "async-trait", "bytes", @@ -1917,8 +1701,8 @@ version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffa6c52cfed0734c5f93754d1c0175f558175248bf686c944fb05c373e5fc096" dependencies = [ - "arrow 58.1.0", - "arrow-ipc 58.1.0", + "arrow", + "arrow-ipc", "async-trait", "bytes", "datafusion-common", @@ -1941,7 +1725,7 @@ version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "503f29e0582c1fc189578d665ff57d9300da1f80c282777d7eb67bb79fb8cdca" dependencies = [ - "arrow 58.1.0", + "arrow", "async-trait", "bytes", "datafusion-common", @@ -1964,7 +1748,7 @@ version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e33804749abc8d0c8cb7473228483cb8070e524c6f6086ee1b85a64debe2b3d2" dependencies = [ - "arrow 58.1.0", + "arrow", "async-trait", "bytes", "datafusion-common", @@ -1988,7 +1772,7 @@ version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a8e0365e0e08e8ff94d912f0ababcf9065a1a304018ba90b1fc83c855b4997" dependencies = [ - "arrow 58.1.0", + "arrow", "async-trait", "bytes", "datafusion-common", @@ -2024,8 +1808,8 @@ version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c03c7fbdaefcca4ef6ffe425a5fc2325763bfb426599bb0bf4536466efabe709" dependencies = [ - "arrow 58.1.0", - "arrow-buffer 58.1.0", + "arrow", + "arrow-buffer", "async-trait", "chrono", "dashmap", @@ -2047,7 +1831,7 @@ version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "574b9b6977fedbd2a611cbff12e5caf90f31640ad9dc5870f152836d94bad0dd" dependencies = [ - "arrow 58.1.0", + "arrow", "async-trait", "chrono", "datafusion-common", @@ -2070,7 +1854,7 @@ version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d7c3adf3db8bf61e92eb90cb659c8e8b734593a8f7c8e12a843c7ddba24b87e" dependencies = [ - "arrow 58.1.0", + "arrow", "datafusion-common", "indexmap", "itertools 0.14.0", @@ -2083,8 +1867,8 @@ version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f28aa4e10384e782774b10e72aca4d93ef7b31aa653095d9d4536b0a3dbc51b6" dependencies = [ - "arrow 58.1.0", - "arrow-buffer 58.1.0", + "arrow", + "arrow-buffer", "base64", "blake2", "blake3", @@ -2116,7 +1900,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00aa6217e56098ba84e0a338176fe52f0a84cca398021512c6c8c5eff806d0ad" dependencies = [ "ahash", - "arrow 58.1.0", + "arrow", "datafusion-common", "datafusion-doc", "datafusion-execution", @@ -2138,7 +1922,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b511250349407db7c43832ab2de63f5557b19a20dfd236b39ca2c04468b50d47" dependencies = [ "ahash", - "arrow 58.1.0", + "arrow", "datafusion-common", "datafusion-expr-common", "datafusion-physical-expr-common", @@ -2150,8 +1934,8 @@ version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef13a858e20d50f0a9bb5e96e7ac82b4e7597f247515bccca4fdd2992df0212a" dependencies = [ - "arrow 58.1.0", - "arrow-ord 58.1.0", + "arrow", + "arrow-ord", "datafusion-common", "datafusion-doc", "datafusion-execution", @@ -2175,7 +1959,7 @@ version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b40d3f5bbb3905f9ccb1ce9485a9595c77b69758a7c24d3ba79e334ff51e7e" dependencies = [ - "arrow 58.1.0", + "arrow", "async-trait", "datafusion-catalog", "datafusion-common", @@ -2191,7 +1975,7 @@ version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4e88ec9d57c9b685d02f58bfee7be62d72610430ddcedb82a08e5d9925dbfb6" dependencies = [ - "arrow 58.1.0", + "arrow", "datafusion-common", "datafusion-doc", "datafusion-expr", @@ -2230,7 +2014,7 @@ version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e929015451a67f77d9d8b727b2bf3a40c4445fdef6cdc53281d7d97c76888ace" dependencies = [ - "arrow 58.1.0", + "arrow", "chrono", "datafusion-common", "datafusion-expr", @@ -2251,7 +2035,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b1e68aba7a4b350401cfdf25a3d6f989ad898a7410164afe9ca52080244cb59" dependencies = [ "ahash", - "arrow 58.1.0", + "arrow", "datafusion-common", "datafusion-expr", "datafusion-expr-common", @@ -2274,7 +2058,7 @@ version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea22315f33cf2e0adc104e8ec42e285f6ed93998d565c65e82fec6a9ee9f9db4" dependencies = [ - "arrow 58.1.0", + "arrow", "datafusion-common", "datafusion-expr", "datafusion-functions", @@ -2290,7 +2074,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b04b45ea8ad3ac2d78f2ea2a76053e06591c9629c7a603eda16c10649ecf4362" dependencies = [ "ahash", - "arrow 58.1.0", + "arrow", "chrono", "datafusion-common", "datafusion-expr-common", @@ -2306,7 +2090,7 @@ version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7cb13397809a425918f608dfe8653f332015a3e330004ab191b4404187238b95" dependencies = [ - "arrow 58.1.0", + "arrow", "datafusion-common", "datafusion-execution", "datafusion-expr", @@ -2326,9 +2110,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5edc023675791af9d5fb4cc4c24abf5f7bd3bd4dcf9e5bd90ea1eff6976dcc79" dependencies = [ "ahash", - "arrow 58.1.0", - "arrow-ord 58.1.0", - "arrow-schema 58.1.0", + "arrow", + "arrow-ord", + "arrow-schema", "async-trait", "datafusion-common", "datafusion-common-runtime", @@ -2357,7 +2141,7 @@ version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac8c76860e355616555081cab5968cec1af7a80701ff374510860bcd567e365a" dependencies = [ - "arrow 58.1.0", + "arrow", "datafusion-common", "datafusion-datasource", "datafusion-expr-common", @@ -2388,7 +2172,7 @@ version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa0d133ddf8b9b3b872acac900157f783e7b879fe9a6bccf389abebbfac45ec1" dependencies = [ - "arrow 58.1.0", + "arrow", "bigdecimal", "chrono", "datafusion-common", @@ -4227,12 +4011,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d3f9f2205199603564127932b89695f52b62322f541d0fc7179d57c2e1c9877" dependencies = [ "ahash", - "arrow-array 58.1.0", - "arrow-buffer 58.1.0", - "arrow-data 58.1.0", - "arrow-ipc 58.1.0", - "arrow-schema 58.1.0", - "arrow-select 58.1.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-ipc", + "arrow-schema", + "arrow-select", "base64", "brotli", "bytes", @@ -4268,14 +4052,13 @@ dependencies = [ "actix-web-static-files", "anyhow", "argon2", - "arrow 58.1.0", - "arrow 59.0.0", - "arrow-array 58.1.0", + "arrow", + "arrow-array", "arrow-flight", - "arrow-ipc 58.1.0", - "arrow-json 58.1.0", - "arrow-schema 58.1.0", - "arrow-select 58.1.0", + "arrow-ipc", + "arrow-json", + "arrow-schema", + "arrow-select", "async-trait", "base64", "byteorder", diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 993a40b10..86c2be877 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -44,6 +44,7 @@ pub mod oidc; pub mod prism_home; pub mod prism_logstream; pub mod query; +pub mod query_context; pub mod rbac; pub mod resource_check; pub mod role; diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 0d6941cb8..3da7d6e53 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -58,6 +58,7 @@ impl ParseableServer for QueryServer { web::scope(&base_path()) .service(Server::get_correlation_webscope()) .service(Server::get_query_factory()) + .service(Server::get_query_context_factory()) .service(Server::get_liveness_factory()) .service(Server::get_readiness_factory()) .service(Server::get_about_factory()) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 6a0309075..06703ad7d 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -29,6 +29,7 @@ use crate::handlers::http::max_event_payload_size; use crate::handlers::http::middleware::IntraClusterRequest; use crate::handlers::http::prism_base_path; use crate::handlers::http::query; +use crate::handlers::http::query_context; use crate::handlers::http::targets; use crate::handlers::http::users::dashboards; use crate::handlers::http::users::filters; @@ -80,6 +81,7 @@ impl ParseableServer for Server { web::scope(&base_path()) .service(Self::get_correlation_webscope()) .service(Self::get_query_factory()) + .service(Self::get_query_context_factory()) .service(Self::get_ingest_factory()) .service(Self::get_liveness_factory()) .service(Self::get_readiness_factory()) @@ -424,6 +426,16 @@ impl Server { ) } + // POST "/query/context" ==> Get a page of logs around a selected log line + pub fn get_query_context_factory() -> Resource { + web::resource("/query/context").route( + web::post() + .to(query_context::query_context) + .authorize(Action::Query) + .wrap(IntraClusterRequest), + ) + } + // get the logstream web scope pub fn get_logstream_webscope() -> Scope { web::scope("/logstream") diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index c7641eaac..c7baf8cba 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -115,6 +115,45 @@ pub async fn get_records_and_fields( Ok((Some(records), Some(fields))) } +/// Executes a query after the caller has already loaded and authorized the streams. +/// +/// This keeps structured APIs that fan out into multiple internal SQL queries from repeating +/// distributed stream loading and RBAC checks for every generated query. +pub async fn get_records_and_fields_for_authorized_query( + query_request: &Query, + authorized_tables: &[String], + tenant_id: &Option, +) -> Result<(Option>, Option>), QueryError> { + let mut session_state = QUERY_SESSION.get_ctx().state(); + let time_range = + TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?; + let tables = resolve_stream_names(&query_request.query)?; + if tables + .iter() + .any(|table| !authorized_tables.contains(table)) + { + return Err(QueryError::Unauthorized); + } + + session_state + .config_mut() + .options_mut() + .catalog + .default_schema = tenant_id.as_deref().unwrap_or("public").to_owned(); + + let query: LogicalQuery = into_query(query_request, &session_state, time_range).await?; + let (records, fields) = execute(query, false, tenant_id).await?; + + let records = match records { + Either::Left(vec_rb) => vec_rb, + Either::Right(_) => { + return Err(QueryError::CustomError("Reject streaming response".into())); + } + }; + + Ok((Some(records), Some(fields))) +} + pub async fn query(req: HttpRequest, query_request: Query) -> Result { let mut session_state = QUERY_SESSION.get_ctx().state(); let time_range = diff --git a/src/handlers/http/query_context.rs b/src/handlers/http/query_context.rs new file mode 100644 index 000000000..f8d26fdb2 --- /dev/null +++ b/src/handlers/http/query_context.rs @@ -0,0 +1,1188 @@ +/* + * Parseable Server (C) 2022 - 2025 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web::web::{self, Json}; +use actix_web::{HttpRequest, Responder}; +use arrow_schema::DataType; +use chrono::{DateTime, NaiveDateTime, SecondsFormat, Utc}; +use itertools::Itertools; +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; +use tracing::{Span, debug, info, warn}; + +use crate::alerts::{alert_structs::Conditions, alerts_utils::get_filter_string}; +use crate::event::DEFAULT_TIMESTAMP_KEY; +use crate::handlers::http::query::{ + Query, QueryError, create_streams_for_distributed, get_records_and_fields_for_authorized_query, +}; +use crate::metrics::increment_query_calls_by_date; +use crate::parseable::{DEFAULT_TENANT, PARSEABLE}; +use crate::rbac::Users; +use crate::utils::actix::extract_session_key_from_req; +use crate::utils::arrow::record_batches_to_json; +use crate::utils::time::truncate_to_minute; +use crate::utils::{get_tenant_id_from_request, user_auth_for_datasets}; + +const DEFAULT_LOG_CONTEXT_PAGE_SIZE: u64 = 500; +const LOG_CONTEXT_ANCHORED_DUPLICATE: &str = "first"; + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LogContextRequest { + pub stream: String, + pub context_window: String, + pub p_timestamp: String, + pub log: Option, + pub body: Option, + pub message: Option, + pub conditions: Option, + pub page_size: Option, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct LogContextResponse { + pub scope: LogContextScope, + pub context_start_time: String, + pub context_end_time: String, + pub limit: u64, + pub anchor_index: u64, + pub duplicate_anchor_count: u64, + pub anchored_duplicate: &'static str, + pub records: Vec, + pub queries: LogContextQueries, +} + +#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub enum LogContextScope { + ContextWindow, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct LogContextQueries { + pub previous: Option, + pub next: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct LogContextQueryPayload { + pub query: String, + pub start_time: String, + pub end_time: String, + pub send_null: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct LogContextMatchField { + name: String, + value: String, +} + +#[derive(Debug, Clone)] +struct LogContextCursor { + timestamp: DateTime, + match_field: LogContextMatchField, +} + +#[tracing::instrument( + name = "query_context", + skip(req, context_request), + fields( + stream = tracing::field::Empty, + tenant = tracing::field::Empty, + page_size = tracing::field::Empty, + scope = tracing::field::Empty + ) +)] +pub async fn query_context( + req: HttpRequest, + context_request: Json, +) -> Result { + let context_request = context_request.into_inner(); + let tenant_id = get_tenant_id_from_request(&req); + let span = Span::current(); + span.record("stream", tracing::field::display(&context_request.stream)); + span.record("tenant", tracing::field::debug(&tenant_id)); + info!( + has_log = context_request.log.is_some(), + has_body = context_request.body.is_some(), + has_message = context_request.message.is_some(), + has_conditions = context_request.conditions.is_some(), + "query context request received" + ); + + let creds = extract_session_key_from_req(&req)?; + let permissions = Users.get_permissions(&creds); + + create_streams_for_distributed(vec![context_request.stream.clone()], &tenant_id).await?; + let authorized_streams = vec![context_request.stream.clone()]; + user_auth_for_datasets( + &permissions, + std::slice::from_ref(&context_request.stream), + &tenant_id, + ) + .await?; + + let page_size = normalize_log_context_page_size(context_request.page_size)?; + span.record("page_size", page_size); + let anchor_timestamp = parse_log_context_timestamp(&context_request.p_timestamp)?; + let (context_start_time, context_end_time) = + log_context_bounds(anchor_timestamp, &context_request.context_window)?; + debug!( + page_size, + anchor_timestamp = %anchor_timestamp, + context_start_time = %context_start_time, + context_end_time = %context_end_time, + "query context request normalized" + ); + + let stream = PARSEABLE.get_stream(&context_request.stream, &tenant_id)?; + let schema = stream.get_schema(); + validate_log_context_schema(schema.as_ref(), DEFAULT_TIMESTAMP_KEY)?; + let match_fields = normalize_log_context_match_fields( + &context_request.log, + &context_request.body, + &context_request.message, + schema.as_ref(), + )?; + let additional_filter = log_context_additional_filter(&context_request.conditions)?; + + let context_start_time_str = format_log_context_api_time(context_start_time); + let context_end_time_str = format_log_context_api_time(context_end_time); + + let anchor_count_query = build_log_context_anchor_count_query( + &context_request.stream, + anchor_timestamp, + context_start_time, + context_end_time, + &match_fields, + additional_filter.as_deref(), + ); + let duplicate_anchor_count = execute_log_context_anchor_count( + &anchor_count_query, + &authorized_streams, + &context_start_time_str, + &context_end_time_str, + &tenant_id, + ) + .await?; + debug!( + duplicate_anchor_count, + "query context anchor count resolved" + ); + + let scope = LogContextScope::ContextWindow; + span.record("scope", tracing::field::debug(&scope)); + info!( + scope = ?scope, + duplicate_anchor_count, + "query context scope selected" + ); + + if duplicate_anchor_count == 0 { + warn!( + scope = ?scope, + "query context anchor row not found" + ); + return Err(QueryError::CustomError( + "No log row matched the provided pTimestamp and log/body/message value".to_string(), + )); + } + + let fetch_limit = page_size; + let newer_payload = build_log_context_newer_query_payload( + &context_request.stream, + anchor_timestamp, + context_start_time, + context_end_time, + &match_fields, + additional_filter.as_deref(), + &context_start_time_str, + &context_end_time_str, + fetch_limit, + ); + let older_payload = build_log_context_anchor_and_older_query_payload( + &context_request.stream, + anchor_timestamp, + context_start_time, + context_end_time, + &match_fields, + additional_filter.as_deref(), + &context_start_time_str, + &context_end_time_str, + fetch_limit, + ); + + let (newer_records, older_records) = tokio::try_join!( + execute_log_context_rows(&newer_payload, &authorized_streams, &tenant_id), + execute_log_context_rows(&older_payload, &authorized_streams, &tenant_id), + )?; + let (records, anchor_index) = + build_log_context_records_window(newer_records, older_records, page_size)?; + + let queries = build_log_context_cursor_queries( + &context_request.stream, + context_start_time, + context_end_time, + &match_fields, + additional_filter.as_deref(), + &context_start_time_str, + &context_end_time_str, + page_size, + &records, + )?; + + info!( + scope = ?scope, + anchor_index, + duplicate_anchor_count, + record_count = records.len(), + "query context rows fetched" + ); + + let current_date = chrono::Utc::now().date_naive().to_string(); + increment_query_calls_by_date( + ¤t_date, + tenant_id.as_deref().unwrap_or(DEFAULT_TENANT), + ); + + Ok(web::Json(LogContextResponse { + scope, + context_start_time: context_start_time_str, + context_end_time: context_end_time_str, + limit: page_size, + anchor_index, + duplicate_anchor_count, + anchored_duplicate: LOG_CONTEXT_ANCHORED_DUPLICATE, + records, + queries, + })) +} + +fn normalize_log_context_page_size(page_size: Option) -> Result { + let page_size = page_size.unwrap_or(DEFAULT_LOG_CONTEXT_PAGE_SIZE); + if page_size == 0 { + return Err(QueryError::CustomError( + "pageSize must be greater than 0".to_string(), + )); + } + + Ok(page_size.min(DEFAULT_LOG_CONTEXT_PAGE_SIZE)) +} + +fn parse_log_context_timestamp(raw: &str) -> Result, QueryError> { + let raw = raw.trim(); + let timestamp = DateTime::parse_from_rfc3339(raw) + .map(|timestamp| timestamp.with_timezone(&Utc)) + .or_else(|rfc3339_err| { + parse_log_context_naive_utc_timestamp(raw) + .map(|timestamp| DateTime::from_naive_utc_and_offset(timestamp, Utc)) + .map_err(|_| QueryError::CustomError(format!("Invalid pTimestamp: {rfc3339_err}"))) + })?; + + DateTime::from_timestamp_millis(timestamp.timestamp_millis()).ok_or_else(|| { + QueryError::CustomError("pTimestamp is outside the supported range".to_string()) + }) +} + +fn parse_log_context_naive_utc_timestamp(raw: &str) -> Result { + NaiveDateTime::parse_from_str(raw, "%Y-%m-%dT%H:%M:%S%.f") + .or_else(|_| NaiveDateTime::parse_from_str(raw, "%Y-%m-%d %H:%M:%S%.f")) +} + +fn log_context_bounds( + anchor_timestamp: DateTime, + context_window: &str, +) -> Result<(DateTime, DateTime), QueryError> { + let duration = humantime::parse_duration(context_window) + .map_err(|err| QueryError::CustomError(format!("Invalid contextWindow: {err}")))?; + if duration.is_zero() { + return Err(QueryError::CustomError( + "contextWindow must be greater than 0".to_string(), + )); + } + + let duration = chrono::Duration::from_std(duration) + .map_err(|err| QueryError::CustomError(format!("Invalid contextWindow: {err}")))?; + let start = truncate_to_minute(anchor_timestamp - duration); + let mut end = truncate_to_minute(anchor_timestamp + duration); + + if start >= end { + end = start + chrono::Duration::minutes(1); + } + + Ok((start, end)) +} + +fn normalize_log_context_match_fields( + log: &Option, + body: &Option, + message: &Option, + schema: &arrow_schema::Schema, +) -> Result, QueryError> { + let fields = [("log", log), ("body", body), ("message", message)] + .into_iter() + .filter_map(|(field_name, value)| value.as_ref().map(|value| (field_name, value))) + .map(|(name, value)| { + validate_log_context_match_field_schema(schema, name)?; + Ok(LogContextMatchField { + name: name.to_string(), + value: value.clone(), + }) + }) + .collect::, QueryError>>()?; + + if fields.is_empty() { + return Err(QueryError::CustomError( + "Request must include exactly one of log, body, or message".to_string(), + )); + } + + if fields.len() > 1 { + return Err(QueryError::CustomError( + "Request must include exactly one of log, body, or message".to_string(), + )); + } + + Ok(fields) +} + +fn validate_log_context_schema( + schema: &arrow_schema::Schema, + field_name: &str, +) -> Result<(), QueryError> { + if schema + .fields() + .iter() + .any(|field| field.name() == field_name) + { + return Ok(()); + } + + Err(QueryError::CustomError(format!( + "Field '{field_name}' does not exist in stream schema" + ))) +} + +fn validate_log_context_match_field_schema( + schema: &arrow_schema::Schema, + field_name: &str, +) -> Result<(), QueryError> { + let field = schema + .fields() + .iter() + .find(|field| field.name() == field_name) + .ok_or_else(|| { + QueryError::CustomError(format!( + "Field '{field_name}' does not exist in stream schema" + )) + })?; + + match field.data_type() { + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Ok(()), + data_type => Err(QueryError::CustomError(format!( + "Field '{field_name}' must be a string column for log context matching; found {data_type}" + ))), + } +} + +fn log_context_additional_filter( + conditions: &Option, +) -> Result, QueryError> { + conditions + .as_ref() + .map(|conditions| { + get_filter_string(conditions) + .map(|filter| format!("({filter})")) + .map_err(|err| QueryError::CustomError(format!("Invalid conditions: {err}"))) + }) + .transpose() +} + +fn build_log_context_anchor_count_query( + stream: &str, + anchor_timestamp: DateTime, + context_start_time: DateTime, + context_end_time: DateTime, + match_fields: &[LogContextMatchField], + additional_filter: Option<&str>, +) -> String { + let scope_filter = + build_log_context_scope_filter(context_start_time, context_end_time, additional_filter); + let anchor_match_predicate = + build_log_context_anchor_match_predicate(anchor_timestamp, match_fields); + let stream = quote_sql_identifier(stream); + + format!( + r#"SELECT + COUNT(*) AS duplicate_anchor_count +FROM {stream} +WHERE {scope_filter} AND ({anchor_match_predicate})"# + ) +} + +fn build_log_context_newer_query_payload( + stream: &str, + anchor_timestamp: DateTime, + context_start_time: DateTime, + context_end_time: DateTime, + match_fields: &[LogContextMatchField], + additional_filter: Option<&str>, + start_time: &str, + end_time: &str, + limit: u64, +) -> LogContextQueryPayload { + LogContextQueryPayload { + query: build_log_context_neighbor_query( + stream, + context_start_time, + context_end_time, + &LogContextCursor { + timestamp: anchor_timestamp, + match_field: match_fields[0].clone(), + }, + LogContextSeekDirection::Newer, + false, + match_fields, + additional_filter, + limit, + ), + start_time: start_time.to_string(), + end_time: end_time.to_string(), + send_null: false, + } +} + +fn build_log_context_anchor_and_older_query_payload( + stream: &str, + anchor_timestamp: DateTime, + context_start_time: DateTime, + context_end_time: DateTime, + match_fields: &[LogContextMatchField], + additional_filter: Option<&str>, + start_time: &str, + end_time: &str, + limit: u64, +) -> LogContextQueryPayload { + LogContextQueryPayload { + query: build_log_context_anchor_and_older_query( + stream, + anchor_timestamp, + context_start_time, + context_end_time, + match_fields, + additional_filter, + limit, + ), + start_time: start_time.to_string(), + end_time: end_time.to_string(), + send_null: false, + } +} + +#[derive(Debug, Clone, Copy)] +enum LogContextSeekDirection { + Newer, + Older, +} + +fn build_log_context_anchor_and_older_query( + stream: &str, + anchor_timestamp: DateTime, + context_start_time: DateTime, + context_end_time: DateTime, + match_fields: &[LogContextMatchField], + additional_filter: Option<&str>, + limit: u64, +) -> String { + let anchor_cursor = LogContextCursor { + timestamp: anchor_timestamp, + match_field: match_fields[0].clone(), + }; + let scope_filter = + build_log_context_scope_filter(context_start_time, context_end_time, additional_filter); + let predicate = build_log_context_anchor_and_older_predicate(&anchor_cursor); + let order_by = build_log_context_order_by(match_fields); + let stream = quote_sql_identifier(stream); + + format!( + "SELECT * FROM {stream} WHERE ({scope_filter}) AND ({predicate}) {order_by} LIMIT {limit}" + ) +} + +#[allow(clippy::too_many_arguments)] +fn build_log_context_neighbor_query( + stream: &str, + context_start_time: DateTime, + context_end_time: DateTime, + cursor: &LogContextCursor, + direction: LogContextSeekDirection, + wrap_for_display_order: bool, + match_fields: &[LogContextMatchField], + additional_filter: Option<&str>, + limit: u64, +) -> String { + let scope_filter = + build_log_context_scope_filter(context_start_time, context_end_time, additional_filter); + let predicate = build_log_context_cursor_predicate(cursor, direction); + let order_by = match direction { + LogContextSeekDirection::Newer => build_log_context_reverse_order_by(match_fields), + LogContextSeekDirection::Older => build_log_context_order_by(match_fields), + }; + let stream = quote_sql_identifier(stream); + let query = format!( + "SELECT * FROM {stream} WHERE ({scope_filter}) AND ({predicate}) {order_by} LIMIT {limit}" + ); + + if wrap_for_display_order { + let display_order_by = build_log_context_order_by(match_fields); + format!("SELECT * FROM ({query}) AS log_context_seek_page {display_order_by}") + } else { + query + } +} + +fn build_log_context_cursor_query_payload( + stream: &str, + context_start_time: DateTime, + context_end_time: DateTime, + match_fields: &[LogContextMatchField], + additional_filter: Option<&str>, + start_time: &str, + end_time: &str, + limit: u64, + cursor: &LogContextCursor, + direction: LogContextSeekDirection, +) -> LogContextQueryPayload { + LogContextQueryPayload { + query: build_log_context_neighbor_query( + stream, + context_start_time, + context_end_time, + cursor, + direction, + matches!(direction, LogContextSeekDirection::Newer), + match_fields, + additional_filter, + limit, + ), + start_time: start_time.to_string(), + end_time: end_time.to_string(), + send_null: false, + } +} + +#[allow(clippy::too_many_arguments)] +fn build_log_context_cursor_queries( + stream: &str, + context_start_time: DateTime, + context_end_time: DateTime, + match_fields: &[LogContextMatchField], + additional_filter: Option<&str>, + start_time: &str, + end_time: &str, + limit: u64, + records: &[Value], +) -> Result { + let previous = records + .first() + .map(|record| log_context_cursor_from_record(record, &match_fields[0])) + .transpose()? + .map(|cursor| { + build_log_context_cursor_query_payload( + stream, + context_start_time, + context_end_time, + match_fields, + additional_filter, + start_time, + end_time, + limit, + &cursor, + LogContextSeekDirection::Newer, + ) + }); + let next = records + .last() + .map(|record| log_context_cursor_from_record(record, &match_fields[0])) + .transpose()? + .map(|cursor| { + build_log_context_cursor_query_payload( + stream, + context_start_time, + context_end_time, + match_fields, + additional_filter, + start_time, + end_time, + limit, + &cursor, + LogContextSeekDirection::Older, + ) + }); + + Ok(LogContextQueries { previous, next }) +} + +fn build_log_context_records_window( + newer_records: Vec, + older_records: Vec, + page_size: u64, +) -> Result<(Vec, u64), QueryError> { + if older_records.is_empty() { + return Err(QueryError::CustomError( + "Anchor row was not returned by the context query".to_string(), + )); + } + + let page_size = usize::try_from(page_size) + .map_err(|_| QueryError::CustomError("pageSize is too large".to_string()))?; + let (newer_take, older_take) = + log_context_window_counts(newer_records.len(), older_records.len(), page_size); + let mut newer_records = newer_records + .into_iter() + .take(newer_take) + .collect::>(); + newer_records.reverse(); + + let anchor_index = newer_records.len() as u64; + let mut records = newer_records; + records.extend(older_records.into_iter().take(older_take)); + + Ok((records, anchor_index)) +} + +fn log_context_window_counts( + newer_len: usize, + older_len: usize, + page_size: usize, +) -> (usize, usize) { + let target_newer = page_size / 2; + let mut newer_take = newer_len.min(target_newer); + let older_take = older_len.min(page_size.saturating_sub(newer_take)); + if newer_take + older_take < page_size { + newer_take = newer_len.min(newer_take + page_size - newer_take - older_take); + } + + (newer_take, older_take) +} + +fn build_log_context_scope_filter( + context_start_time: DateTime, + context_end_time: DateTime, + additional_filter: Option<&str>, +) -> String { + let timestamp_column = quote_sql_identifier(DEFAULT_TIMESTAMP_KEY); + let time_filter = format!( + "{timestamp_column} >= {} AND {timestamp_column} < {}", + timestamp_sql_literal(context_start_time), + timestamp_sql_literal(context_end_time) + ); + + match additional_filter { + Some(filter) => format!("({time_filter}) AND {filter}"), + None => time_filter, + } +} + +fn build_log_context_anchor_match_predicate( + anchor_timestamp: DateTime, + match_fields: &[LogContextMatchField], +) -> String { + let mut predicates = vec![format!( + "{} = {}", + quote_sql_identifier(DEFAULT_TIMESTAMP_KEY), + timestamp_sql_literal(anchor_timestamp) + )]; + + predicates.extend(match_fields.iter().map(|field| { + format!( + "{} = {}", + quote_sql_identifier(&field.name), + quote_sql_string_literal(&field.value) + ) + })); + + predicates.join(" AND ") +} + +fn build_log_context_anchor_and_older_predicate(cursor: &LogContextCursor) -> String { + let timestamp_column = quote_sql_identifier(DEFAULT_TIMESTAMP_KEY); + let timestamp = timestamp_sql_literal(cursor.timestamp); + let field_column = quote_sql_identifier(&cursor.match_field.name); + let field_value = quote_sql_string_literal(&cursor.match_field.value); + + format!( + "{timestamp_column} < {timestamp} OR ({timestamp_column} = {timestamp} AND {field_column} >= {field_value})" + ) +} + +fn build_log_context_cursor_predicate( + cursor: &LogContextCursor, + direction: LogContextSeekDirection, +) -> String { + let timestamp_column = quote_sql_identifier(DEFAULT_TIMESTAMP_KEY); + let timestamp = timestamp_sql_literal(cursor.timestamp); + let field_column = quote_sql_identifier(&cursor.match_field.name); + let field_value = quote_sql_string_literal(&cursor.match_field.value); + + match direction { + LogContextSeekDirection::Newer => format!( + "{timestamp_column} > {timestamp} OR ({timestamp_column} = {timestamp} AND {field_column} < {field_value})" + ), + LogContextSeekDirection::Older => format!( + "{timestamp_column} < {timestamp} OR ({timestamp_column} = {timestamp} AND {field_column} > {field_value})" + ), + } +} + +fn build_log_context_order_by(match_fields: &[LogContextMatchField]) -> String { + let mut order_columns = vec![format!( + "{} DESC", + quote_sql_identifier(DEFAULT_TIMESTAMP_KEY) + )]; + order_columns.extend( + match_fields + .iter() + .map(|field| format!("{} ASC", quote_sql_identifier(&field.name))), + ); + + format!("ORDER BY {}", order_columns.into_iter().join(", ")) +} + +fn build_log_context_reverse_order_by(match_fields: &[LogContextMatchField]) -> String { + let mut order_columns = vec![format!( + "{} ASC", + quote_sql_identifier(DEFAULT_TIMESTAMP_KEY) + )]; + order_columns.extend( + match_fields + .iter() + .map(|field| format!("{} DESC", quote_sql_identifier(&field.name))), + ); + + format!("ORDER BY {}", order_columns.into_iter().join(", ")) +} + +#[tracing::instrument( + name = "query_context.execute_anchor_count", + skip(sql, authorized_streams, tenant_id), + fields( + tenant = tracing::field::Empty, + start_time = %start_time, + end_time = %end_time, + sql_len = sql.len(), + authorized_stream_count = authorized_streams.len() + ) +)] +async fn execute_log_context_anchor_count( + sql: &str, + authorized_streams: &[String], + start_time: &str, + end_time: &str, + tenant_id: &Option, +) -> Result { + Span::current().record("tenant", tracing::field::debug(tenant_id)); + debug!("query context anchor count query starting"); + + let query_request = Query { + query: sql.to_string(), + start_time: start_time.to_string(), + end_time: end_time.to_string(), + send_null: false, + fields: false, + streaming: false, + filter_tags: None, + }; + + let (records, _) = + get_records_and_fields_for_authorized_query(&query_request, authorized_streams, tenant_id) + .await?; + let records = records.unwrap_or_default(); + let rows = record_batches_to_json(&records)?; + let row = rows + .first() + .ok_or_else(|| QueryError::CustomError("No anchor count returned".to_string()))?; + let duplicate_anchor_count = json_u64_field(row, "duplicate_anchor_count")?; + debug!( + duplicate_anchor_count, + "query context anchor count query completed" + ); + + Ok(duplicate_anchor_count) +} + +#[tracing::instrument( + name = "query_context.execute_rows", + skip(payload, authorized_streams, tenant_id), + fields( + tenant = tracing::field::Empty, + start_time = %payload.start_time, + end_time = %payload.end_time, + sql_len = payload.query.len(), + authorized_stream_count = authorized_streams.len() + ) +)] +async fn execute_log_context_rows( + payload: &LogContextQueryPayload, + authorized_streams: &[String], + tenant_id: &Option, +) -> Result, QueryError> { + Span::current().record("tenant", tracing::field::debug(tenant_id)); + debug!("query context row query starting"); + + let query_request = Query { + query: payload.query.clone(), + start_time: payload.start_time.clone(), + end_time: payload.end_time.clone(), + send_null: payload.send_null, + fields: false, + streaming: false, + filter_tags: None, + }; + + let (records, _) = + get_records_and_fields_for_authorized_query(&query_request, authorized_streams, tenant_id) + .await?; + let records = records.unwrap_or_default(); + let records: Vec = record_batches_to_json(&records)? + .into_iter() + .map(Value::Object) + .collect(); + + debug!( + record_count = records.len(), + "query context row query completed" + ); + + Ok(records) +} + +fn json_u64_field(row: &Map, field: &str) -> Result { + let value = row + .get(field) + .ok_or_else(|| QueryError::CustomError(format!("Missing field '{field}'")))?; + + match value { + Value::Number(number) => { + if let Some(value) = number.as_u64() { + return Ok(value); + } + if let Some(value) = number.as_i64() + && value >= 0 + { + return Ok(value as u64); + } + Err(QueryError::CustomError(format!( + "Field '{field}' is not a valid unsigned integer" + ))) + } + Value::Null => Ok(0), + _ => Err(QueryError::CustomError(format!( + "Field '{field}' is not a valid unsigned integer" + ))), + } +} + +fn log_context_cursor_from_record( + record: &Value, + match_field: &LogContextMatchField, +) -> Result { + let record = record.as_object().ok_or_else(|| { + QueryError::CustomError("Log context record is not an object".to_string()) + })?; + let timestamp = record + .get(DEFAULT_TIMESTAMP_KEY) + .and_then(Value::as_str) + .ok_or_else(|| { + QueryError::CustomError(format!("Missing field '{DEFAULT_TIMESTAMP_KEY}' in record")) + }) + .and_then(parse_log_context_timestamp)?; + let value = record + .get(&match_field.name) + .and_then(Value::as_str) + .ok_or_else(|| { + QueryError::CustomError(format!("Missing field '{}' in record", match_field.name)) + })? + .to_string(); + + Ok(LogContextCursor { + timestamp, + match_field: LogContextMatchField { + name: match_field.name.clone(), + value, + }, + }) +} + +fn format_log_context_api_time(timestamp: DateTime) -> String { + timestamp.to_rfc3339_opts(SecondsFormat::Secs, true) +} + +fn timestamp_sql_literal(timestamp: DateTime) -> String { + format!( + "TIMESTAMP '{}'", + timestamp.naive_utc().format("%Y-%m-%d %H:%M:%S%.3f") + ) +} + +fn quote_sql_identifier(identifier: &str) -> String { + format!("\"{}\"", identifier.replace('"', "\"\"")) +} + +fn quote_sql_string_literal(value: &str) -> String { + format!("'{}'", value.replace('\'', "''")) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_schema::{DataType, Field, Schema, TimeUnit}; + + fn schema_with(fields: &[&str]) -> Schema { + Schema::new( + fields + .iter() + .map(|field| { + if *field == DEFAULT_TIMESTAMP_KEY { + Field::new( + *field, + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ) + } else { + Field::new(*field, DataType::Utf8, true) + } + }) + .collect::>(), + ) + } + + fn anchor_timestamp() -> DateTime { + parse_log_context_timestamp("2026-06-17T10:15:42.123456Z").unwrap() + } + + #[test] + fn log_context_timestamp_parser_accepts_request_and_query_result_formats() { + assert_eq!( + parse_log_context_timestamp("2026-06-18T07:39:59.995Z").unwrap(), + parse_log_context_timestamp("2026-06-18T07:39:59.995").unwrap() + ); + assert_eq!( + parse_log_context_timestamp("2026-06-18T07:39:59.995Z").unwrap(), + parse_log_context_timestamp("2026-06-18 07:39:59.995").unwrap() + ); + } + + #[test] + fn log_context_window_counts_center_anchor_and_fill_edges() { + assert_eq!(log_context_window_counts(250, 250, 500), (250, 250)); + assert_eq!(log_context_window_counts(10, 1000, 500), (10, 490)); + assert_eq!(log_context_window_counts(1000, 10, 500), (490, 10)); + assert_eq!(log_context_window_counts(10, 10, 500), (10, 10)); + } + + #[test] + fn log_context_page_size_defaults_and_clamps() { + assert_eq!( + normalize_log_context_page_size(None).unwrap(), + DEFAULT_LOG_CONTEXT_PAGE_SIZE + ); + assert_eq!(normalize_log_context_page_size(Some(1)).unwrap(), 1); + assert_eq!( + normalize_log_context_page_size(Some(DEFAULT_LOG_CONTEXT_PAGE_SIZE + 1)).unwrap(), + DEFAULT_LOG_CONTEXT_PAGE_SIZE + ); + assert!(normalize_log_context_page_size(Some(0)).is_err()); + } + + #[test] + fn log_context_bounds_apply_window_and_truncate_to_minute() { + let (start, end) = log_context_bounds(anchor_timestamp(), "1m").unwrap(); + assert_eq!(format_log_context_api_time(start), "2026-06-17T10:14:00Z"); + assert_eq!(format_log_context_api_time(end), "2026-06-17T10:16:00Z"); + + let (start, end) = log_context_bounds(anchor_timestamp(), "5s").unwrap(); + assert_eq!(format_log_context_api_time(start), "2026-06-17T10:15:00Z"); + assert_eq!(format_log_context_api_time(end), "2026-06-17T10:16:00Z"); + } + + #[test] + fn log_context_match_fields_accept_exactly_one_anchor_field() { + let schema = schema_with(&[DEFAULT_TIMESTAMP_KEY, "body", "log", "message"]); + + let log_fields = normalize_log_context_match_fields( + &Some("log value".to_string()), + &None, + &None, + &schema, + ) + .unwrap(); + assert_eq!( + log_fields, + vec![LogContextMatchField { + name: "log".to_string(), + value: "log value".to_string(), + }] + ); + + let body_fields = normalize_log_context_match_fields( + &None, + &Some("body value".to_string()), + &None, + &schema, + ) + .unwrap(); + assert_eq!( + body_fields, + vec![LogContextMatchField { + name: "body".to_string(), + value: "body value".to_string(), + }] + ); + + let message_fields = normalize_log_context_match_fields( + &None, + &None, + &Some("message value".to_string()), + &schema, + ) + .unwrap(); + assert_eq!( + message_fields, + vec![LogContextMatchField { + name: "message".to_string(), + value: "message value".to_string(), + }] + ); + } + + #[test] + fn log_context_match_fields_reject_missing_multiple_and_absent_schema_fields() { + let full_schema = schema_with(&[DEFAULT_TIMESTAMP_KEY, "body", "log", "message"]); + let log_only_schema = schema_with(&[DEFAULT_TIMESTAMP_KEY, "log"]); + + assert!(normalize_log_context_match_fields(&None, &None, &None, &full_schema).is_err()); + assert!( + normalize_log_context_match_fields( + &Some("log value".to_string()), + &Some("body value".to_string()), + &None, + &full_schema, + ) + .is_err() + ); + assert!( + normalize_log_context_match_fields( + &None, + &Some("body value".to_string()), + &None, + &log_only_schema + ) + .is_err() + ); + + let numeric_message_schema = Schema::new(vec![ + Field::new( + DEFAULT_TIMESTAMP_KEY, + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + Field::new("message", DataType::Int64, true), + ]); + assert!( + normalize_log_context_match_fields( + &None, + &None, + &Some("message value".to_string()), + &numeric_message_schema, + ) + .is_err() + ); + } + + #[test] + fn log_context_anchor_count_sql_only_counts_anchor_duplicates() { + let anchor = anchor_timestamp(); + let (start, end) = log_context_bounds(anchor, "1m").unwrap(); + let match_fields = vec![LogContextMatchField { + name: "message".to_string(), + value: "alpha".to_string(), + }]; + + let sql = + build_log_context_anchor_count_query("logs", anchor, start, end, &match_fields, None); + + assert!(sql.contains("COUNT(*) AS duplicate_anchor_count")); + assert!(!sql.contains("rows_before")); + assert!(!sql.contains("COUNT(*) AS total")); + assert!(sql.contains("\"p_timestamp\" = TIMESTAMP '2026-06-17 10:15:42.123'")); + assert!(sql.contains("\"message\" = 'alpha'")); + assert!(!sql.contains("\"log\" = 'alpha' AND \"body\"")); + } + + #[test] + fn log_context_cursor_sql_builds_previous_and_next_pages() { + let anchor = anchor_timestamp(); + let (start, end) = log_context_bounds(anchor, "1m").unwrap(); + let match_fields = vec![LogContextMatchField { + name: "message".to_string(), + value: "alpha".to_string(), + }]; + let cursor = LogContextCursor { + timestamp: anchor, + match_field: match_fields[0].clone(), + }; + + let previous_sql = build_log_context_neighbor_query( + "logs", + start, + end, + &cursor, + LogContextSeekDirection::Newer, + true, + &match_fields, + None, + 500, + ); + assert!(previous_sql.contains("\"p_timestamp\" > TIMESTAMP '2026-06-17 10:15:42.123'")); + assert!(previous_sql.contains("\"message\" < 'alpha'")); + assert!(previous_sql.contains("ORDER BY \"p_timestamp\" ASC, \"message\" DESC LIMIT 500")); + assert!(previous_sql.ends_with("ORDER BY \"p_timestamp\" DESC, \"message\" ASC")); + assert!(!previous_sql.contains("OFFSET")); + + let next_sql = build_log_context_neighbor_query( + "logs", + start, + end, + &cursor, + LogContextSeekDirection::Older, + false, + &match_fields, + None, + 500, + ); + assert!(next_sql.contains("\"p_timestamp\" < TIMESTAMP '2026-06-17 10:15:42.123'")); + assert!(next_sql.contains("\"message\" > 'alpha'")); + assert!(next_sql.contains("ORDER BY \"p_timestamp\" DESC, \"message\" ASC LIMIT 500")); + assert!(!next_sql.contains("OFFSET")); + } +} From 046a454dd26c535209d65dc9855e9bdd547319d2 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 19 Jun 2026 14:01:22 +0700 Subject: [PATCH 2/3] query string to have limit and offset --- src/handlers/http/query_context.rs | 164 ++++++++++++++++++----------- 1 file changed, 100 insertions(+), 64 deletions(-) diff --git a/src/handlers/http/query_context.rs b/src/handlers/http/query_context.rs index f8d26fdb2..76ba6d808 100644 --- a/src/handlers/http/query_context.rs +++ b/src/handlers/http/query_context.rs @@ -44,7 +44,7 @@ const LOG_CONTEXT_ANCHORED_DUPLICATE: &str = "first"; #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct LogContextRequest { - pub stream: String, + pub dataset: String, pub context_window: String, pub p_timestamp: String, pub log: Option, @@ -88,6 +88,7 @@ pub struct LogContextQueryPayload { pub start_time: String, pub end_time: String, pub send_null: bool, + pub reverse_records: bool, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -106,7 +107,7 @@ struct LogContextCursor { name = "query_context", skip(req, context_request), fields( - stream = tracing::field::Empty, + dataset = tracing::field::Empty, tenant = tracing::field::Empty, page_size = tracing::field::Empty, scope = tracing::field::Empty @@ -119,7 +120,7 @@ pub async fn query_context( let context_request = context_request.into_inner(); let tenant_id = get_tenant_id_from_request(&req); let span = Span::current(); - span.record("stream", tracing::field::display(&context_request.stream)); + span.record("dataset", tracing::field::display(&context_request.dataset)); span.record("tenant", tracing::field::debug(&tenant_id)); info!( has_log = context_request.log.is_some(), @@ -132,11 +133,11 @@ pub async fn query_context( let creds = extract_session_key_from_req(&req)?; let permissions = Users.get_permissions(&creds); - create_streams_for_distributed(vec![context_request.stream.clone()], &tenant_id).await?; - let authorized_streams = vec![context_request.stream.clone()]; + create_streams_for_distributed(vec![context_request.dataset.clone()], &tenant_id).await?; + let authorized_datasets = vec![context_request.dataset.clone()]; user_auth_for_datasets( &permissions, - std::slice::from_ref(&context_request.stream), + std::slice::from_ref(&context_request.dataset), &tenant_id, ) .await?; @@ -154,8 +155,8 @@ pub async fn query_context( "query context request normalized" ); - let stream = PARSEABLE.get_stream(&context_request.stream, &tenant_id)?; - let schema = stream.get_schema(); + let dataset = PARSEABLE.get_stream(&context_request.dataset, &tenant_id)?; + let schema = dataset.get_schema(); validate_log_context_schema(schema.as_ref(), DEFAULT_TIMESTAMP_KEY)?; let match_fields = normalize_log_context_match_fields( &context_request.log, @@ -169,7 +170,7 @@ pub async fn query_context( let context_end_time_str = format_log_context_api_time(context_end_time); let anchor_count_query = build_log_context_anchor_count_query( - &context_request.stream, + &context_request.dataset, anchor_timestamp, context_start_time, context_end_time, @@ -178,7 +179,7 @@ pub async fn query_context( ); let duplicate_anchor_count = execute_log_context_anchor_count( &anchor_count_query, - &authorized_streams, + &authorized_datasets, &context_start_time_str, &context_end_time_str, &tenant_id, @@ -209,7 +210,7 @@ pub async fn query_context( let fetch_limit = page_size; let newer_payload = build_log_context_newer_query_payload( - &context_request.stream, + &context_request.dataset, anchor_timestamp, context_start_time, context_end_time, @@ -220,7 +221,7 @@ pub async fn query_context( fetch_limit, ); let older_payload = build_log_context_anchor_and_older_query_payload( - &context_request.stream, + &context_request.dataset, anchor_timestamp, context_start_time, context_end_time, @@ -232,14 +233,14 @@ pub async fn query_context( ); let (newer_records, older_records) = tokio::try_join!( - execute_log_context_rows(&newer_payload, &authorized_streams, &tenant_id), - execute_log_context_rows(&older_payload, &authorized_streams, &tenant_id), + execute_log_context_rows(&newer_payload, &authorized_datasets, &tenant_id), + execute_log_context_rows(&older_payload, &authorized_datasets, &tenant_id), )?; let (records, anchor_index) = build_log_context_records_window(newer_records, older_records, page_size)?; let queries = build_log_context_cursor_queries( - &context_request.stream, + &context_request.dataset, context_start_time, context_end_time, &match_fields, @@ -378,7 +379,7 @@ fn validate_log_context_schema( } Err(QueryError::CustomError(format!( - "Field '{field_name}' does not exist in stream schema" + "Field '{field_name}' does not exist in dataset schema" ))) } @@ -392,7 +393,7 @@ fn validate_log_context_match_field_schema( .find(|field| field.name() == field_name) .ok_or_else(|| { QueryError::CustomError(format!( - "Field '{field_name}' does not exist in stream schema" + "Field '{field_name}' does not exist in dataset schema" )) })?; @@ -418,7 +419,7 @@ fn log_context_additional_filter( } fn build_log_context_anchor_count_query( - stream: &str, + dataset: &str, anchor_timestamp: DateTime, context_start_time: DateTime, context_end_time: DateTime, @@ -429,18 +430,18 @@ fn build_log_context_anchor_count_query( build_log_context_scope_filter(context_start_time, context_end_time, additional_filter); let anchor_match_predicate = build_log_context_anchor_match_predicate(anchor_timestamp, match_fields); - let stream = quote_sql_identifier(stream); + let dataset = quote_sql_identifier(dataset); format!( r#"SELECT COUNT(*) AS duplicate_anchor_count -FROM {stream} +FROM {dataset} WHERE {scope_filter} AND ({anchor_match_predicate})"# ) } fn build_log_context_newer_query_payload( - stream: &str, + dataset: &str, anchor_timestamp: DateTime, context_start_time: DateTime, context_end_time: DateTime, @@ -452,7 +453,7 @@ fn build_log_context_newer_query_payload( ) -> LogContextQueryPayload { LogContextQueryPayload { query: build_log_context_neighbor_query( - stream, + dataset, context_start_time, context_end_time, &LogContextCursor { @@ -460,19 +461,20 @@ fn build_log_context_newer_query_payload( match_field: match_fields[0].clone(), }, LogContextSeekDirection::Newer, - false, match_fields, additional_filter, - limit, + Some(limit), + None, ), start_time: start_time.to_string(), end_time: end_time.to_string(), send_null: false, + reverse_records: false, } } fn build_log_context_anchor_and_older_query_payload( - stream: &str, + dataset: &str, anchor_timestamp: DateTime, context_start_time: DateTime, context_end_time: DateTime, @@ -484,7 +486,7 @@ fn build_log_context_anchor_and_older_query_payload( ) -> LogContextQueryPayload { LogContextQueryPayload { query: build_log_context_anchor_and_older_query( - stream, + dataset, anchor_timestamp, context_start_time, context_end_time, @@ -495,6 +497,7 @@ fn build_log_context_anchor_and_older_query_payload( start_time: start_time.to_string(), end_time: end_time.to_string(), send_null: false, + reverse_records: false, } } @@ -505,7 +508,7 @@ enum LogContextSeekDirection { } fn build_log_context_anchor_and_older_query( - stream: &str, + dataset: &str, anchor_timestamp: DateTime, context_start_time: DateTime, context_end_time: DateTime, @@ -521,24 +524,24 @@ fn build_log_context_anchor_and_older_query( build_log_context_scope_filter(context_start_time, context_end_time, additional_filter); let predicate = build_log_context_anchor_and_older_predicate(&anchor_cursor); let order_by = build_log_context_order_by(match_fields); - let stream = quote_sql_identifier(stream); + let dataset = quote_sql_identifier(dataset); format!( - "SELECT * FROM {stream} WHERE ({scope_filter}) AND ({predicate}) {order_by} LIMIT {limit}" + "SELECT * FROM {dataset} WHERE ({scope_filter}) AND ({predicate}) {order_by} LIMIT {limit}" ) } #[allow(clippy::too_many_arguments)] fn build_log_context_neighbor_query( - stream: &str, + dataset: &str, context_start_time: DateTime, context_end_time: DateTime, cursor: &LogContextCursor, direction: LogContextSeekDirection, - wrap_for_display_order: bool, match_fields: &[LogContextMatchField], additional_filter: Option<&str>, - limit: u64, + limit: Option, + offset: Option, ) -> String { let scope_filter = build_log_context_scope_filter(context_start_time, context_end_time, additional_filter); @@ -547,21 +550,21 @@ fn build_log_context_neighbor_query( LogContextSeekDirection::Newer => build_log_context_reverse_order_by(match_fields), LogContextSeekDirection::Older => build_log_context_order_by(match_fields), }; - let stream = quote_sql_identifier(stream); - let query = format!( - "SELECT * FROM {stream} WHERE ({scope_filter}) AND ({predicate}) {order_by} LIMIT {limit}" - ); + let dataset = quote_sql_identifier(dataset); + let limit_clause = limit + .map(|limit| format!(" LIMIT {limit}")) + .unwrap_or_default(); + let offset_clause = offset + .map(|offset| format!(" OFFSET {offset}")) + .unwrap_or_default(); - if wrap_for_display_order { - let display_order_by = build_log_context_order_by(match_fields); - format!("SELECT * FROM ({query}) AS log_context_seek_page {display_order_by}") - } else { - query - } + format!( + "SELECT * FROM {dataset} WHERE ({scope_filter}) AND ({predicate}) {order_by}{limit_clause}{offset_clause}" + ) } fn build_log_context_cursor_query_payload( - stream: &str, + dataset: &str, context_start_time: DateTime, context_end_time: DateTime, match_fields: &[LogContextMatchField], @@ -572,27 +575,29 @@ fn build_log_context_cursor_query_payload( cursor: &LogContextCursor, direction: LogContextSeekDirection, ) -> LogContextQueryPayload { + let reverse_records = matches!(direction, LogContextSeekDirection::Newer); LogContextQueryPayload { query: build_log_context_neighbor_query( - stream, + dataset, context_start_time, context_end_time, cursor, direction, - matches!(direction, LogContextSeekDirection::Newer), match_fields, additional_filter, - limit, + Some(limit), + Some(0), ), start_time: start_time.to_string(), end_time: end_time.to_string(), send_null: false, + reverse_records, } } #[allow(clippy::too_many_arguments)] fn build_log_context_cursor_queries( - stream: &str, + dataset: &str, context_start_time: DateTime, context_end_time: DateTime, match_fields: &[LogContextMatchField], @@ -608,7 +613,7 @@ fn build_log_context_cursor_queries( .transpose()? .map(|cursor| { build_log_context_cursor_query_payload( - stream, + dataset, context_start_time, context_end_time, match_fields, @@ -626,7 +631,7 @@ fn build_log_context_cursor_queries( .transpose()? .map(|cursor| { build_log_context_cursor_query_payload( - stream, + dataset, context_start_time, context_end_time, match_fields, @@ -784,18 +789,18 @@ fn build_log_context_reverse_order_by(match_fields: &[LogContextMatchField]) -> #[tracing::instrument( name = "query_context.execute_anchor_count", - skip(sql, authorized_streams, tenant_id), + skip(sql, authorized_datasets, tenant_id), fields( tenant = tracing::field::Empty, start_time = %start_time, end_time = %end_time, sql_len = sql.len(), - authorized_stream_count = authorized_streams.len() + authorized_dataset_count = authorized_datasets.len() ) )] async fn execute_log_context_anchor_count( sql: &str, - authorized_streams: &[String], + authorized_datasets: &[String], start_time: &str, end_time: &str, tenant_id: &Option, @@ -814,7 +819,7 @@ async fn execute_log_context_anchor_count( }; let (records, _) = - get_records_and_fields_for_authorized_query(&query_request, authorized_streams, tenant_id) + get_records_and_fields_for_authorized_query(&query_request, authorized_datasets, tenant_id) .await?; let records = records.unwrap_or_default(); let rows = record_batches_to_json(&records)?; @@ -832,18 +837,18 @@ async fn execute_log_context_anchor_count( #[tracing::instrument( name = "query_context.execute_rows", - skip(payload, authorized_streams, tenant_id), + skip(payload, authorized_datasets, tenant_id), fields( tenant = tracing::field::Empty, start_time = %payload.start_time, end_time = %payload.end_time, sql_len = payload.query.len(), - authorized_stream_count = authorized_streams.len() + authorized_dataset_count = authorized_datasets.len() ) )] async fn execute_log_context_rows( payload: &LogContextQueryPayload, - authorized_streams: &[String], + authorized_datasets: &[String], tenant_id: &Option, ) -> Result, QueryError> { Span::current().record("tenant", tracing::field::debug(tenant_id)); @@ -860,7 +865,7 @@ async fn execute_log_context_rows( }; let (records, _) = - get_records_and_fields_for_authorized_query(&query_request, authorized_streams, tenant_id) + get_records_and_fields_for_authorized_query(&query_request, authorized_datasets, tenant_id) .await?; let records = records.unwrap_or_default(); let records: Vec = record_batches_to_json(&records)? @@ -1158,15 +1163,15 @@ mod tests { end, &cursor, LogContextSeekDirection::Newer, - true, &match_fields, None, - 500, + None, + None, ); assert!(previous_sql.contains("\"p_timestamp\" > TIMESTAMP '2026-06-17 10:15:42.123'")); assert!(previous_sql.contains("\"message\" < 'alpha'")); - assert!(previous_sql.contains("ORDER BY \"p_timestamp\" ASC, \"message\" DESC LIMIT 500")); - assert!(previous_sql.ends_with("ORDER BY \"p_timestamp\" DESC, \"message\" ASC")); + assert!(previous_sql.ends_with("ORDER BY \"p_timestamp\" ASC, \"message\" DESC")); + assert!(!previous_sql.contains("LIMIT")); assert!(!previous_sql.contains("OFFSET")); let next_sql = build_log_context_neighbor_query( @@ -1175,14 +1180,45 @@ mod tests { end, &cursor, LogContextSeekDirection::Older, - false, &match_fields, None, - 500, + None, + None, ); assert!(next_sql.contains("\"p_timestamp\" < TIMESTAMP '2026-06-17 10:15:42.123'")); assert!(next_sql.contains("\"message\" > 'alpha'")); - assert!(next_sql.contains("ORDER BY \"p_timestamp\" DESC, \"message\" ASC LIMIT 500")); + assert!(next_sql.ends_with("ORDER BY \"p_timestamp\" DESC, \"message\" ASC")); + assert!(!next_sql.contains("LIMIT")); assert!(!next_sql.contains("OFFSET")); + + let previous_payload = build_log_context_cursor_query_payload( + "logs", + start, + end, + &match_fields, + None, + "2026-06-17T10:14:00Z", + "2026-06-17T10:16:00Z", + 500, + &cursor, + LogContextSeekDirection::Newer, + ); + assert!(previous_payload.query.ends_with("LIMIT 500 OFFSET 0")); + assert!(previous_payload.reverse_records); + + let next_payload = build_log_context_cursor_query_payload( + "logs", + start, + end, + &match_fields, + None, + "2026-06-17T10:14:00Z", + "2026-06-17T10:16:00Z", + 500, + &cursor, + LogContextSeekDirection::Older, + ); + assert!(next_payload.query.ends_with("LIMIT 500 OFFSET 0")); + assert!(!next_payload.reverse_records); } } From 39294a0473fdb25981c8871b2eae5aa1df3e64d7 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 19 Jun 2026 14:17:10 +0700 Subject: [PATCH 3/3] clippy fix --- src/handlers/http/query_context.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/handlers/http/query_context.rs b/src/handlers/http/query_context.rs index 76ba6d808..422d4d85e 100644 --- a/src/handlers/http/query_context.rs +++ b/src/handlers/http/query_context.rs @@ -440,6 +440,7 @@ WHERE {scope_filter} AND ({anchor_match_predicate})"# ) } +#[allow(clippy::too_many_arguments)] fn build_log_context_newer_query_payload( dataset: &str, anchor_timestamp: DateTime, @@ -473,6 +474,7 @@ fn build_log_context_newer_query_payload( } } +#[allow(clippy::too_many_arguments)] fn build_log_context_anchor_and_older_query_payload( dataset: &str, anchor_timestamp: DateTime, @@ -563,6 +565,7 @@ fn build_log_context_neighbor_query( ) } +#[allow(clippy::too_many_arguments)] fn build_log_context_cursor_query_payload( dataset: &str, context_start_time: DateTime,