
    9j<                     \   U d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlZd dlm	Z	m
Z
 d dlmZ d dlmZmZmZmZmZmZmZ d dlmZ d dlmZmZmZmZmZmZ d dlmZm Z m!Z!m"Z"m#Z#m$Z$ d d	l%m&Z& d d
l'm(Z(  ejR                  e*      Z+ejX                  e-d<   ddgZ. G d de
      Z/ G d de	      Z0y)    N)Any)FileSystemReaderFileSystemWriter)consolidate_safetensors_files)_gen_file_name_HFStorageInfo_metadata_fnCUSTOM_METADATA_KEYSAVED_OFFSETS_KEYSHARDED_DIR_NAMESUFFIX)SerializationFormat)ChunkStorageMetadataMetadataMetadataIndexStorageMetaTensorPropertiesTensorStorageMetadata)LoadPlanLoadPlannerReadItemSavePlanSavePlanner	WriteItem)WriteResult)FutureloggerHuggingFaceStorageWriterHuggingFaceStorageReaderc                       e Zd ZdZ	 	 	 	 	 ddedeeef   dz  dedededed	df fd
Zde	e
   d	e	e
   fdZde
ded	ee	e      f fdZdede	e	e      d	dfdZdeeef   dz  de	e   d	eee	e   f   fdZed	efd       Z xZS )r   zP
    A writer that writes to storage in the huggingface safetensors format.
    Npathfqn_to_index_mappingthread_countsave_distributedenable_consolidationthread_count_consolidationreturnc                 6   t         |   |t        j                  |       || _        || _        || _        d| _        | j                  rIt        | j                        | _        | j                  j                  | j                  t              | _	        || _        y)a  
        Initialize the huggingface writer pointing to path.

        Args:
            path: directory where the checkpoint will be read from.
            fqn_to_index_mapping: A mapping from tensor FQN to the index of the file that the tensor should be written to.
                              Indices are from 1 to N, where N is the number of files. If not provided,
                              the tensors will be written to a single file. If none, then all the tensors on the
                              same rank will be written to the same file.
            thread_count: Number of threads to use to write distributed checkpoint. Default to 1.
            save_distributed: If True, save the checkpoint using distributed APIs where every rank saves its own shard.
                        Default is False which assumes rank-0 checkpointing of the full state_dict.
            enable_consolidation: If True, consolidate the sharded checkpoint after saving. The sharded tensors will be
                                saved to path/sharded and the full tensors will be saved to path. Default to False.
            thread_count_consolidation: Number of threads to use for parallel processing of saving data
                                to consolidated output files. Default to 1.
        )r!   serialization_formatr#   N)super__init__r   SAFETENSORSr"   r$   r%   consolidated_output_pathstrr!   fsconcat_pathr   r&   )selfr!   r"   r#   r$   r%   r&   	__class__s          g/media/conek/DATA/Code/OCR/venv/lib/python3.12/site-packages/torch/distributed/checkpoint/hf_storage.pyr+   z!HuggingFaceStorageWriter.__init__6   s    6 	!4!@!@% 	 	

 <P!&6*>!48%$$,/		ND)++DII7GHDI*D'    plansc                     g }t        |d      D ]Y  \  }}i }| j                  | j                  |d<   | j                  r||d<   |j                  t	        j
                  ||             [ |S )N   )startr"   shard_index)storage_data)	enumerater"   r$   appenddataclassesreplace)r1   r5   	new_plansiplanr:   s         r3   prepare_global_planz,HuggingFaceStorageWriter.prepare_global_plan_   s{    	 a0 	SGAt+-L((47;7P7P34$$./]+[00LQR	S r4   rA   plannerc                 &   t        |j                        dk(  rt               }|j                  g        |S |j                  }d }d }d|v r|d   }d|v r|d   }| j                  ||j                        }|t        |j                               nd}t        j                         }	|j                         D ]J  \  }
}t        |
||      }|	j                  | j                  j                  | j                  |      ||f       L t        | A  ||	      S )Nr   r"   r9   r7   )lenitemsr   
set_resultr:   _split_by_storage_planmaxvaluesqueueQueuer   putr/   r0   r!   r*   _write_data)r1   rA   rC   futr:   storage_planr9   bucketshighest_index
file_queue
file_indexwrite_items	file_namer2   s                r3   
write_dataz#HuggingFaceStorageWriter.write_datal   s   
 tzz?a (CNN2J (,'8'8.2"&!\1'(>?LL(&}5K--lDJJG6B6NL//12TU"'++-
'.}} 	#J&z=+NINN$$TYY	:I{S	 w"7J77r4   metadataresultsc                    | j                   r"| j                  st        j                  d       y | j                   r|| j                  | j                  n-t
        j                  |j                  j                         d      }t        t        | j                        | j                  | j                  |      S i }i }d}|D ]z  }|j                  |D ci c]-  }|j                  j                   |j"                  j$                  / c}       |t'        |D cg c]  }|j"                  j(                   c}      z  }| d|i|d<   ||d<   | j*                  j-                  | j                  t.               }	| j*                  j1                  |	d      5 }
t3        j4                  ||
d	
       d d d        y c c}w c c}w # 1 sw Y   y xY w)Nz4Not consolidating sharded checkpoint in finish step.r7   )	input_dir
output_dirnum_threadsr"   r   
total_sizerX   
weight_mapw   )indent)r$   r%   r   infor"   dictfromkeysstate_dict_metadatakeysr   r.   r!   r-   r&   updateindexfqnr:   relative_pathsumlengthr/   r0   r	   create_streamjsondump)r1   rX   rY   r"   metadata_to_write
storage_mdr^   wr_listwrmetadata_pathmetadata_files              r3   finishzHuggingFaceStorageWriter.finish   s     )B)B
 KKNO   ,,8 ))]]8#?#?#D#D#FJ ! 1dii.88 ;;%9	  

 	JGGNOr<<<O #H"r55HIIJ		J
 *6z(B*%*4,'++DII,IWW""=#6 	B-II'qA	B 	B PH
	B 	Bs   2F1
F6F;;GrP   rF   c                     |d|iS i }|D ]<  }|j                   j                  }||   }||vr|g||<   )||   j                  |       > |S )Nr7   )ri   rj   r<   )r1   rP   rF   rQ   itemkeyidxs          r3   rH   z/HuggingFaceStorageWriter._split_by_storage_plan   sk     u: 	*D**..Cs#C'! $v##D)	* r4   c                     t         S N)r	   )r1   s    r3   ru   z&HuggingFaceStorageWriter.metadata_path   s    r4   )Nr7   FFr7   )__name__
__module____qualname____doc__r.   rd   intboolr+   listr   rB   r   r   r   rW   r   rw   r   rH   propertyru   __classcell__r2   s   @r3   r   r   1   s>    7;!&%**+'E'E #38nt3'E 	'E
 'E #'E %('E 
'ERh DN 88 8 
[!	"	8>%Bx %B$tK7H2I %Bd %BN cNT1:>y/	c4	?"	#& s  r4   c                        e Zd ZdZddededdf fdZdededdfd	Z	d
e
j                  de
j                  deddfdZdededed   fdZdefdZ xZS )r   zQ
    A reader that reads a checkpoint in the huggingface safetensors format.
    r!   r#   r'   Nc                 4    t         |   |       || _        y)z
        Initialize the huggingface reader pointing to path.

        Args:
            path: directory where the checkpoint will be read from.
            thread_count: Number of threads to use to read distributed checkpoint. Default to 1.
        )r!   N)r*   r+   r#   )r1   r!   r#   r2   s      r3   r+   z!HuggingFaceStorageReader.__init__   s     	d#(r4   reqrC   c           	         t        d t        |j                  |j                        D              }|j	                  |j
                  j                        |   }|j                  |      j                         }|j                         |j                         k7  r:t        d|j
                   d|j                          d|j                                |j                  |       |j                  ||       y)z1Helper function to process a single read request.c              3   @   K   | ]  \  }}t        |||z           y wr}   )slice).0offsetrm   s      r3   	<genexpr>zAHuggingFaceStorageReader._process_read_request.<locals>.<genexpr>   s'      
 &&6/*
s   zreq z mismatch sizes z vs N)tuplezipstorage_offsetslengths	get_slicestorage_indexrj   resolve_tensordetachsizeAssertionErrorcopy_commit_tensor)r1   fr   rC   slicestensortarget_tensors          r3   _process_read_requestz.HuggingFaceStorageReader._process_read_request   s      
"%c&9&93;;"G
 
 S..223F;..s3::<6;;=0 s(())9-:L:L:N9OtTZT_T_TaSbc  	F#c=1r4   rS   result_queuec                    ddl m} 	 	 |j                         \  }} ||d      5 }|D ]  }| j                  |||        	 d d d        |j	                  d       S# 1 sw Y   xY w# t
        j                  $ r Y y w xY w)Nr   	safe_openTptfilename	framework)safetensorsr   
get_nowaitr   rM   rK   Empty)	r1   rS   r   rC   r   rV   reqsr   r   s	            r3   _read_files_from_queuez/HuggingFaceStorageReader._read_files_from_queue   s     	*	","7"7"9	4	TB Da# D221c7CDD   & D D {{ 		s(   A( AA( A%!A( (A>=A>rA   c                 T   ddl m} i }|j                  D ]H  }| j                  |j                     }|j
                  }|j                  |g       j                  |       J | j                  dk  st        |      dk  rG|j                         D ]3  \  }} ||d      5 }	|D ]  }
| j                  |	|
|        	 d d d        5 nt        j                         }t        j                         }|j                         D ]  \  }}|j                  ||f        g }t        | j                  t        |            }t        |      D ]G  }t!        j"                  | j$                  |||f      }|j'                          |j                  |       I |D ]  }|j)                           d}	 	 |j+                          |dz  }t1               }|j3                  d        |S # 1 sw Y   XxY w# t        j,                  $ r Y nw xY w|t        |      k7  sRt/        d| dt        |             )	Nr   r   r7   r   r   )targetargszNot all files were processed: z out of )r   r   rF   r:   r   rk   
setdefaultr<   r#   rE   r   rK   rL   rM   minrange	threadingThreadr   r8   joinr   r   r   r   rG   )r1   rA   rC   r   per_file	read_itemitem_mdrV   r   r   r   rS   r   threadsr]   _tprocessed_countrO   s                      r3   	read_dataz"HuggingFaceStorageReader.read_data   s8   ).0 	AI&*&7&7	8O8O&PG--I	2.55i@	A
 !S]a%7#+>>#3 D	4	TB Da# D221c7CDD DD ',kkmJ(-L $,>>#3 2	4	4012 Gd//X?K;' "$$66$lG< 	q!"    O ++-#q(O  ht
WD DB ;;  #h-/$4_4EXcRZm_] s   G'G( G%	(G>=G>c                    ddl m} ddlm} i }i }g }| j                  j                  | j                        D ])  }|j                  t              s|j                  |       + |D ]~  } ||d      5 }|j                         }	|j                         }
d }|
r=|
j                  t              r(t        j                  |
j                  t                    }|	D ]   }|j!                  |      j#                         }|j!                  |      j%                         }|||   t&           }ndgt)        |      z  }||vrt+        t-         ||            t/        j0                  t3        ||      D cg c]
  \  }}||z    c}}      t5        t/        j0                        t/        j0                  |            g      ||<   n||   j6                  j                  t5        t/        j0                  |      t/        j0                  |      	             t9        ||   j:                        }t=        t)        |            D ]  }t?        ||   ||   ||   z         ||<    t/        j0                  |      ||   _        |tA        |||   t&           
      }ntA        |dgt)        |      z  
      }tC        |t/        j0                  |       ||            ||<    	 d d d         tE        ||      }tG        |dd       tI               |_%        | jL                  |jJ                  _&        |S c c}}w # 1 sw Y   xY w)Nr   r   )	_getdtyper   )r   )dtype)offsetssizes)
propertiesr   chunks)r   )rj   r   )rk   shaper   )rf   r:   storage_meta)'r   r   safetensors.torchr   r/   lsr!   endswithr   r<   rg   rX   getr
   ro   loadsr   	get_shape	get_dtyper   rE   r   r   torchSizer   r   r   r   r   r   rI   r   r   r   getattrr   r   load_id)r1   r   r   rf   r:   safetensors_filesfilesafetensor_filer   rg   extra_metadatadcp_sharding_inforz   r   r   r   savedr   r@   metadata_indexrX   s                        r3   read_metadataz&HuggingFaceStorageReader.read_metadata9  s   )/@B<>GGJJtyy) 	/D}}V$!((.	/  1 7	O?d; 6qvvx!"$(!!n&8&89L&M(,

&**+>?)%   ,CKK,668EKK,668E(4!23!78I!J"#s5z!1"553H'7i>N'O!&EHPVEW XME6 X" !5,1JJv,>*/**U*;!"$4+C0 ,C077>>0 %

6 2%**U:K
  $$7$<$A$AB!&s4y!1 IA&)$q'58fQi3G&HDGI8=

48H+C05 )4)6 #,=c,BCT,U* *73sSQVZGW)X3A&5#jj/'.4L0Q,6 67	r  3%

 8^T2:$/MH!(,%U !Y/6 6s    4C<L40L.?EL4.L44L>	)r7   )r~   r   r   r   r.   r   r+   r   r   r   rK   rL   r   r   r   r   r   r   r   r   s   @r3   r   r      s    
)S 
) 
)D 
)2H 2{ 2t 2$KK kk 	
 
$7h 7 7 7tNx Nr4   )1r=   ro   loggingrK   r   typingr   r   torch.distributed.checkpointr   r   8torch.distributed.checkpoint._consolidate_hf_safetensorsr   &torch.distributed.checkpoint._hf_utilsr   r   r	   r
   r   r   r   'torch.distributed.checkpoint.filesystemr   %torch.distributed.checkpoint.metadatar   r   r   r   r   r   $torch.distributed.checkpoint.plannerr   r   r   r   r   r   $torch.distributed.checkpoint.storager   torch.futuresr   	getLoggerr~   r   Logger__annotations____all__r   r    r4   r3   <module>r      s           K   H   =   +**84 4%'A
BV/ Vr}/ }r4   