Continuously Adaptive Continuous Queries Over Streams
Continuously Adaptive Continuous Queries (CACQ) over Streams by Samuel Madden, Mehul Shah, Joseph Hellerstein, and Vijayshankar Raman SIGMOD' 2002 1
CACQ Introduction l Proposed continuous query (CQ) systems are based on static plans – – – l CACQ insight: apply continuous adaptivity of eddies to continuous queries – – – 2 But, CQs are long running Initially valid assumptions less so over time Static optimizers at their worst! Avoid static optimization via dynamic operator ordering Process multiple queries simultaneously Explore sharing of work & storage
Motivating Applications l "Monitoring" queries look for recent events in streams – – – l Sensor data processing Stock analysis Router, web, or phone events In CACQ, queries over 'recent-history' l l 3 Only tuples currently entering the system Stored in in-memory data tables for time-windowed joins between streams
Continuous Queries l Long running, "standing queries", similar to trigger systems l Installed; continuously produce streamed results until removed l Lots of queries, over the same data sources – – – Opportunity for work sharing! Global query optimization problem: hard! Idea: adaptive heuristics not quite as hard? l – 4 Bad decisions are not final Future work: finding an optimal plan (adaptively)
Joins in CACQ l CACQ uses Parallel Pipelined Joins – l To avoid blocking Example: Symmetric (Windowed) Hash Join R. a S. b Probe Build Probe 5 R S
CACQ Query Model l SELECT R. a, S. b from R, S where R. c = x and R. d[n] = S. e[m] nn R S Probe into S 6 m
CACQ Main Points l l 7 Adaptivity via Eddies and Routing policies Tuple Lineage for flexible sharing of operators between queries Grouped Filter for efficiently computing selections over multiple queries State Modules (Ste. Ms) for enabling state sharing among joins
Step by Step Using Example 8 l First, just one query with only selections l Then, add multiple queries l Then, add joins to the picture
Eddies & Adaptivity l Eddies (Avnur & Hellerstein, SIGMOD 2000): Continuous Adaptivity l No static ordering of operators ("no query plan") l Routing policy dynamically determines in which order individual tuples go through operators l Encoding of state (lineage) with each tuple : – – 9 Use ready bits to track what to do next Use done bits to track what has been done
Eddies : Single Query, Single Source SELECT * • • FROM R Ready bits track what to do next • All 1's in single source WHERE R. a > 10 AND R. b < 15 (R. b < 15) (R. a > 10) Done bits track what has been done • Tuple can be output when all bits set R 2 R 21 R 2 Eddy R 21 a b 10 R 1 11 11 101110 Ready Done R R 2 R 1 a 15 5 b 025
Multiple Queries Q 1 R 1 Q 1 SELECT * FROM R WHERE R. a > 10 AND R. b < 15 a R 1 R. a > 10 a R. a > 20 R. a = 0 SELECT * FROM R Q 3 Grouped Filters R 1 R 1 WHERE R. a = 0 AND R. b <> 50 11 R R 1 a 5 b 25 b Q 3 R a R 1 1 1 R b b b R 1 R 1 R R R 1 Q 2 SELECT * FROM R WHERE R. a > 20 AND R. b = 25 Q 2 R. b < 15 R. b = 25 R. b <> 50 a b Q 1 Q 2 Q 3 10 10 10 Done Queries. Completed
Multiple Queries Q 1 SELECT * FROM R WHERE R. a > 10 AND R. b < 15 R 2 a R 2 FROM R Q 3 12 R 2 R. a = 0 Grouped Filters R 2 R 2 WHERE R. a = 0 AND R. b <> 50 R. a > 20 R 2 Q 2 SELECT * FROM R WHERE R. a > 20 AND R. b = 25 SELECT * R. a > 10 ba R b R. b < 15 R. b = 25 R. b <> 50 Q 2 Q 3 ba ab R ab R 2 R 2 R R R 1 1 Reorder Operators! R 2 a 15 b 0 a b Q 1 Q 2 Q 3 0 1 1 0 1 0 Done Queries. Completed
Tuple & Query Data Structures l Per tuple bitmaps: – queries. Completed l – What operators can be applied to this tuple? Per query bitmaps: – completion. Mask l 13 What operators have been applied to this tuple? ready l l Bit What operators must be applied to output a tuple to this query? Value Queries. Completed Query 1 1 Query 2 0 done l – What queries has this tuple been output to or rejected by? Tuple [10, 1100, …] Done S. a Index 1 S. b Index 1 R. a – S. b Join 0 UDF (R. a) 0 Query [0110] Bit Value completion. Mask S. a Index 0 S. b Index 1 R. a – S. b Join 1 UDF (R. a) 0
Outputting Tuples l Store a completion. Mask bitmap for each query – – l SELECT * FROM R WHERE R. a > 10 AND R. b < 15 To determine if a tuple t can be output to query q: Q 2 – – l One bit per operator Set if operator in query completion. Masks Q 1 Eddy ANDs q's completion. Mask with t's done bits Output only if q's bit not set in t's queries. Completed bits Every time a tuple returns from an operator a b c d Q 1 1 1 0 0 Q 2 0 1 1 1 SELECT * FROM R WHERE R. b < 15 AND R. c <> 5 AND R. d = 10 Tuple a b c d Q 1 Q 2 1 1 0 0 10 0 Done QC completion. Masks Q 1: 1100 & Done == 1100 && Queries. Completed[0] == 0 14 Q 2: 0111 & Done == 0111
Grouped Filter l Use binary trees to efficiently index range predicates – – l When tuple arrives – – l Two trees (LT & GT) per attribute Insert constant Scan to right (for GT) or left (for LT) of the tuple-attribute in the tree Those are the queries that the tuple does not pass Hash tables to index equality, inequality predicates Greater-than tree over S. a 1 7 S. a > 1 11 S. a > 7 S. a > 11 15 >1 Q 1 >7 Q 2 >11 Q 3 8
Work Sharing via Tuple Lineage Q 1: SELECT * FROM s WHERE A, B, C Q 2: SELECT * FROM s WHERE A, B, D 16
Work Sharing via Tuple Lineage Q 1: SELECT * FROM s WHERE A, B, C Conventional Queries Query 1 Query 2 A s. BC B sc C s 17 Intersection of CD goes through AB an extra time! A s. BD B D s Data Stream S CACQ Lineage (Queries 0 or 1 | 0 or 1 s Completed) CDBA QC Enables Any A Ordering! 0 or 1 | 0 or 1 QC s. CDB 0 or 1 | 0 or 1 QC s. CD s. C 0|0 QC s C 1|1 QC Reject? B s. D C D Q 2: SELECT * FROM s WHERE A, B, D D 0 or 1 | 0 QC Data Stream S Shared Subexpr. Q 1 Q 2 C D s. AB A AB must be applied first! s. B B s Data Stream S
Tradeoff: Overhead vs. Shared Work l Overhead in additional bits per tuple – l Trading accounting overhead for work sharing – l 18 Bit / query / tuple have significant effects 100 bits / tuple allows a tuple to be processed once, not 100 times Reduce overhead by not keeping state about operators tuple will never pass through
Joins in CACQ l l Use symmetric hash join to avoid blocking Use State Modules (Ste. Ms) to share storage between joins with a common base relation R. a S. b Probe Build Probe 19 R S
Processing Joins Via State Modules l l Idea: Share join indices over base relations State Modules (Ste. Ms) are: – – – l Unary indexes (e. g. hash tables, trees) Built on fly (as data arrives) Scheduled by CACQ as first class operators T. c Probe R. a Build Probe S. b Build Probe Based on symmetric hash join R T S Query 2 20 R. a = S. b Query 1 S. b = T. c
Routing Policies 22 l Machinery so far assures correctness l Routing policy responsible for performance l Consult the policy to determine where to route every tuple that: – Enters the system – Returns from an operator
Eddies with Lottery Scheduling l Operator gets 1 ticket when it takes a tuple – l Operator loses a ticket when it returns a tuple – l Favor operators that drop tuples (low selectivity) Winner? – l Favor operators that run fast (low cost) Large number of tickets == measure of goodness Lottery Scheduling: – – When two operators vie for the same tuple, hold a lottery Never let any operator go to zero tickets l 23 Support occasional random "exploration"
Routing Policies For CACQ l Give more tickets to operators shared by multiple queries (e. g. , grouped filters) l When a shared operator outputs a tuple, charge it multiple tickets l Intuition: cardinality reducing shared operators reduce global work more than unshared operators – 24 Not optimizing for the throughput of a single query!
Evaluation l Java implementation on top of Telegraph QP – l Server Platform – – l Linux 2. 4. 10 Pentium III 733, 756 MB RAM Queries posed from separate workstation – 25 4, 000 new lines of code in 75, 000 line codebase Output suppressed
Results: Routing Policy 26 All attributes uniformly Query distributed over [0, 100] 1 From S select index where a > 90 2 From S select index where a > 90 and b > 70 3 From S select index where a > 90 and b > 70 and c > 50 4 From S select index where a > 90 and b > 70 and c > 50 and d > 30 5 From S select index where a > 90 and b > 70 and c > 50 and d > 30 and e > 10
Experiment: Increased Scalability 27 Workload, Per Query: 1 -5 randomly selected range predicates of form 'attr > x' over 5 attributes. Predicates from the uniform distribution [0, 100]. 50% chance of predicate over each attribute.
Conclusion l l CACQ: sharing and adaptivity for processing monitoring queries over data streams Features – Adaptivity l – Work sharing via tuple lineage l – – l Without constraining the available plans Computation sharing via grouped filter Storage sharing via Ste. Ms Future Work – – – 29 Adapt without costly multi-query reoptimization More sophisticated routing policies Batching & query grouping Better integration with historical results
mullawirraburkaheact1966.blogspot.com
Source: https://slidetodoc.com/continuously-adaptive-continuous-queries-cacq-over-streams-by/
0 Response to "Continuously Adaptive Continuous Queries Over Streams"
Post a Comment