pallet_permission0/permission/
stream.rs1use polkadot_sdk::{
2 frame_support::traits::{
3 Currency, ExistenceRequirement, Imbalance, ReservableCurrency, WithdrawReasons,
4 },
5 frame_system,
6 sp_arithmetic::FixedU128,
7 sp_runtime::traits::{Saturating, Zero},
8};
9
10use super::*;
11
12pub type StreamId = H256;
14
15#[derive(Encode, Decode, CloneNoBound, PartialEq, TypeInfo, MaxEncodedLen, DebugNoBound)]
17#[scale_info(skip_type_params(T))]
18pub struct StreamScope<T: Config> {
19 pub recipients: BoundedBTreeMap<T::AccountId, u16, T::MaxRecipientsPerPermission>,
21 pub allocation: StreamAllocation<T>,
23 pub distribution: DistributionControl<T>,
25 pub accumulating: bool,
27 pub recipient_managers: BoundedBTreeSet<T::AccountId, T::MaxControllersPerPermission>,
30 pub weight_setters: BoundedBTreeSet<T::AccountId, T::MaxControllersPerPermission>,
33}
34
35impl<T: Config> StreamScope<T> {
36 pub(super) fn cleanup(
37 self,
38 permission_id: H256,
39 last_executed: &Option<BlockNumberFor<T>>,
40 delegator: &T::AccountId,
41 ) {
42 match self.allocation {
43 StreamAllocation::Streams(streams) => {
44 for stream in streams.keys() {
45 AccumulatedStreamAmounts::<T>::remove((delegator, stream, &permission_id));
46 }
47 }
48 StreamAllocation::FixedAmount(amount) if last_executed.is_none() => {
49 T::Currency::unreserve(delegator, amount);
50 }
51 _ => {}
52 }
53 }
54}
55
56#[derive(Encode, Decode, CloneNoBound, PartialEqNoBound, TypeInfo, MaxEncodedLen, DebugNoBound)]
58#[scale_info(skip_type_params(T))]
59pub enum StreamAllocation<T: Config> {
60 Streams(BoundedBTreeMap<StreamId, Percent, T::MaxStreamsPerPermission>),
62 FixedAmount(BalanceOf<T>),
64}
65
66#[derive(Encode, Decode, CloneNoBound, PartialEq, TypeInfo, MaxEncodedLen, DebugNoBound)]
67#[scale_info(skip_type_params(T))]
68pub enum DistributionControl<T: Config> {
69 Manual,
71 Automatic(BalanceOf<T>),
73 AtBlock(BlockNumberFor<T>),
75 Interval(BlockNumberFor<T>),
77}
78
79pub(crate) fn do_accumulate_streams<T: Config>(
81 agent: &T::AccountId,
82 stream_id: &StreamId,
83 imbalance: &mut NegativeImbalanceOf<T>,
84) {
85 let initial_balance = imbalance.peek();
86 let total_initial_amount =
87 FixedU128::from_inner(initial_balance.try_into().unwrap_or_default());
88 if total_initial_amount.is_zero() {
89 return;
90 }
91
92 let streams = AccumulatedStreamAmounts::<T>::iter_prefix((agent, stream_id));
93 for (permission_id, accumulated) in streams {
94 let Some(contract) = Permissions::<T>::get(permission_id) else {
95 continue;
96 };
97
98 let PermissionScope::Stream(StreamScope {
101 allocation: StreamAllocation::Streams(streams),
102 accumulating,
103 ..
104 }) = contract.scope
105 else {
106 continue;
107 };
108
109 if !accumulating {
110 continue;
111 }
112
113 let Some(percentage) = streams.get(stream_id) else {
114 continue;
115 };
116
117 let delegated_amount = percentage.mul_floor(total_initial_amount.into_inner());
118 if delegated_amount.is_zero() {
119 continue;
120 }
121
122 let delegated_amount = imbalance
123 .extract(delegated_amount.try_into().unwrap_or_default())
124 .peek();
125
126 AccumulatedStreamAmounts::<T>::set(
127 (agent, stream_id, &permission_id),
128 Some(accumulated.saturating_add(delegated_amount)),
129 );
130
131 Pallet::<T>::deposit_event(Event::AccumulatedEmission {
132 permission_id,
133 stream_id: *stream_id,
134 amount: delegated_amount,
135 });
136 }
137}
138
139pub(crate) fn do_auto_distribution<T: Config>(
140 stream_scope: &StreamScope<T>,
141 permission_id: H256,
142 current_block: BlockNumberFor<T>,
143 contract: &PermissionContract<T>,
144) -> DispatchResult {
145 match stream_scope.distribution {
146 DistributionControl::Automatic(threshold) => {
147 let accumulated = match &stream_scope.allocation {
148 StreamAllocation::Streams(streams) => streams
149 .keys()
150 .filter_map(|id| {
151 AccumulatedStreamAmounts::<T>::get((&contract.delegator, id, permission_id))
152 })
153 .fold(BalanceOf::<T>::zero(), |acc, e| acc.saturating_add(e)), StreamAllocation::FixedAmount(amount) => *amount,
155 };
156
157 if accumulated >= threshold {
158 do_distribute_stream::<T>(permission_id, contract, DistributionReason::Automatic)?;
159 }
160 }
161
162 DistributionControl::AtBlock(target_block) if current_block > target_block => {
163 if contract
168 .last_execution()
169 .is_some_and(|last_execution| last_execution >= target_block)
170 {
171 return Ok(());
172 }
173
174 do_distribute_stream::<T>(permission_id, contract, DistributionReason::Automatic)?;
175 }
176
177 DistributionControl::Interval(interval) => {
178 let last_execution = contract.last_execution.unwrap_or(contract.created_at);
179 if current_block.saturating_sub(last_execution) < interval {
180 return Ok(());
181 }
182
183 do_distribute_stream::<T>(permission_id, contract, DistributionReason::Automatic)?;
184 }
185
186 _ => {}
188 }
189
190 Ok(())
191}
192
193#[derive(
194 Encode, Decode, Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, TypeInfo, MaxEncodedLen,
195)]
196pub enum DistributionReason {
197 Automatic,
198 Manual,
199}
200
201pub(crate) fn do_distribute_stream<T: Config>(
203 permission_id: PermissionId,
204 contract: &PermissionContract<T>,
205 reason: DistributionReason,
206) -> DispatchResult {
207 let PermissionScope::Stream(stream_scope) = &contract.scope else {
208 return Ok(());
209 };
210
211 let total_weight =
212 FixedU128::from_u32(stream_scope.recipients.values().map(|w| *w as u32).sum());
213 if total_weight.is_zero() {
214 trace!("permission {permission_id:?} does not have enough target weight");
215 return Ok(());
216 }
217
218 match &stream_scope.allocation {
219 StreamAllocation::Streams(streams) => {
220 let streams = streams.keys().filter_map(|stream_id| {
221 let acc = AccumulatedStreamAmounts::<T>::get((
222 &contract.delegator,
223 stream_id,
224 permission_id,
225 ))?;
226
227 AccumulatedStreamAmounts::<T>::set(
230 (&contract.delegator, stream_id, permission_id),
231 Some(Zero::zero()),
232 );
233
234 if acc.is_zero() {
235 None
236 } else {
237 Some((stream_id, T::Currency::issue(acc)))
241 }
242 });
243
244 for (stream, mut imbalance) in streams {
245 do_distribute_to_targets(
246 &mut imbalance,
247 permission_id,
248 stream_scope,
249 Some(stream),
250 total_weight,
251 reason,
252 );
253
254 let remainder = imbalance.peek();
255 if !remainder.is_zero() {
256 AccumulatedStreamAmounts::<T>::mutate(
257 (&contract.delegator, stream, permission_id),
258 |acc| *acc = Some(acc.unwrap_or_default().saturating_add(remainder)),
259 );
260 }
261 }
262 }
263 StreamAllocation::FixedAmount(amount) => {
264 if contract.last_execution().is_some() {
265 return Ok(());
267 }
268
269 let _ = T::Currency::unreserve(&contract.delegator, *amount);
271 let mut imbalance = T::Currency::withdraw(
272 &contract.delegator,
273 *amount,
274 WithdrawReasons::TRANSFER,
275 ExistenceRequirement::KeepAlive,
276 )
277 .unwrap_or_else(|_| NegativeImbalanceOf::<T>::zero());
278
279 do_distribute_to_targets(
280 &mut imbalance,
281 permission_id,
282 stream_scope,
283 None,
284 total_weight,
285 reason,
286 );
287 }
288 }
289
290 if let Some(mut contract) = Permissions::<T>::get(permission_id) {
291 contract.tick_execution(<frame_system::Pallet<T>>::block_number())?;
292 Permissions::<T>::set(permission_id, Some(contract));
293 }
294
295 Ok(())
296}
297
298fn do_distribute_to_targets<T: Config>(
299 imbalance: &mut NegativeImbalanceOf<T>,
300 permission_id: PermissionId,
301 stream_scope: &StreamScope<T>,
302 stream: Option<&StreamId>,
303 total_weight: FixedU128,
304 reason: DistributionReason,
305) {
306 let initial_balance = imbalance.peek();
307 let total_initial_amount =
308 FixedU128::from_inner(initial_balance.try_into().unwrap_or_default());
309 if total_initial_amount.is_zero() {
310 trace!("no amount to distribute for permission {permission_id:?} and stream {stream:?}");
311 return;
312 }
313
314 for (target, weight) in stream_scope.recipients.iter() {
315 let target_weight = FixedU128::from_u32(*weight as u32);
316 let target_amount = total_initial_amount
317 .saturating_mul(target_weight)
318 .const_checked_div(total_weight)
319 .unwrap_or_default();
320
321 if target_amount.is_zero() {
322 continue;
323 }
324
325 let target_amount =
326 BalanceOf::<T>::try_from(target_amount.into_inner()).unwrap_or_default();
327 let mut imbalance = imbalance.extract(target_amount);
328
329 if let Some(stream) = stream {
330 do_accumulate_streams::<T>(target, stream, &mut imbalance);
332 }
333
334 T::Currency::resolve_creating(target, imbalance);
335
336 Pallet::<T>::deposit_event(Event::StreamDistribution {
337 permission_id,
338 stream_id: stream.cloned(),
339 recipient: target.clone(),
340 amount: target_amount,
341 reason,
342 });
343 }
344}